You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
790 lines
21 KiB
790 lines
21 KiB
// Copyright 2013 Hui Chen |
|
// Copyright 2016 ego authors |
|
// |
|
// Licensed under the Apache License, Version 2.0 (the "License"): you may |
|
// not use this file except in compliance with the License. You may obtain |
|
// a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
|
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
|
// License for the specific language governing permissions and limitations |
|
// under the License. |
|
|
|
/* |
|
|
|
Package riot is riot engine |
|
*/ |
|
package riot |
|
|
|
import ( |
|
"fmt" |
|
"log" |
|
"os" |
|
"runtime" |
|
"sort" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"time" |
|
// "reflect" |
|
|
|
"sync/atomic" |
|
|
|
"github.com/go-ego/riot/core" |
|
"github.com/go-ego/riot/store" |
|
"github.com/go-ego/riot/types" |
|
"github.com/go-ego/riot/utils" |
|
|
|
"github.com/go-ego/gse" |
|
"github.com/go-ego/murmur" |
|
"github.com/shirou/gopsutil/mem" |
|
) |
|
|
|
const ( |
|
// Version get the riot version |
|
Version string = "v0.10.0.425, Danube River!" |
|
|
|
// NumNanosecondsInAMillisecond nano-seconds in a milli-second num |
|
NumNanosecondsInAMillisecond = 1000000 |
|
// StoreFilePrefix persistent store file prefix |
|
StoreFilePrefix = "riot" |
|
|
|
// DefaultPath default db path |
|
DefaultPath = "./riot-index" |
|
) |
|
|
|
// GetVersion get the riot version |
|
func GetVersion() string { |
|
return Version |
|
} |
|
|
|
// Engine initialize the engine |
|
type Engine struct { |
|
loc sync.RWMutex |
|
|
|
// 计数器,用来统计有多少文档被索引等信息 |
|
numDocsIndexed uint64 |
|
numDocsRemoved uint64 |
|
numDocsForceUpdated uint64 |
|
numIndexingReqs uint64 |
|
numRemovingReqs uint64 |
|
numForceUpdatingReqs uint64 |
|
numTokenIndexAdded uint64 |
|
numDocsStored uint64 |
|
|
|
// 记录初始化参数 |
|
initOptions types.EngineOpts |
|
initialized bool |
|
|
|
indexers []core.Indexer |
|
rankers []core.Ranker |
|
segmenter gse.Segmenter |
|
loaded bool |
|
stopTokens StopTokens |
|
dbs []store.Store |
|
|
|
// 建立索引器使用的通信通道 |
|
segmenterChan chan segmenterReq |
|
indexerAddDocChans []chan indexerAddDocReq |
|
indexerRemoveDocChans []chan indexerRemoveDocReq |
|
rankerAddDocChans []chan rankerAddDocReq |
|
|
|
// 建立排序器使用的通信通道 |
|
indexerLookupChans []chan indexerLookupReq |
|
rankerRankChans []chan rankerRankReq |
|
rankerRemoveDocChans []chan rankerRemoveDocReq |
|
|
|
// 建立持久存储使用的通信通道 |
|
storeIndexDocChans []chan storeIndexDocReq |
|
storeInitChan chan bool |
|
} |
|
|
|
// Indexer initialize the indexer channel |
|
func (engine *Engine) Indexer(options types.EngineOpts) { |
|
engine.indexerAddDocChans = make( |
|
[]chan indexerAddDocReq, options.NumShards) |
|
|
|
engine.indexerRemoveDocChans = make( |
|
[]chan indexerRemoveDocReq, options.NumShards) |
|
|
|
engine.indexerLookupChans = make( |
|
[]chan indexerLookupReq, options.NumShards) |
|
|
|
for shard := 0; shard < options.NumShards; shard++ { |
|
engine.indexerAddDocChans[shard] = make( |
|
chan indexerAddDocReq, options.IndexerBufLen) |
|
|
|
engine.indexerRemoveDocChans[shard] = make( |
|
chan indexerRemoveDocReq, options.IndexerBufLen) |
|
|
|
engine.indexerLookupChans[shard] = make( |
|
chan indexerLookupReq, options.IndexerBufLen) |
|
} |
|
} |
|
|
|
// Ranker initialize the ranker channel |
|
func (engine *Engine) Ranker(options types.EngineOpts) { |
|
engine.rankerAddDocChans = make( |
|
[]chan rankerAddDocReq, options.NumShards) |
|
|
|
engine.rankerRankChans = make( |
|
[]chan rankerRankReq, options.NumShards) |
|
|
|
engine.rankerRemoveDocChans = make( |
|
[]chan rankerRemoveDocReq, options.NumShards) |
|
|
|
for shard := 0; shard < options.NumShards; shard++ { |
|
engine.rankerAddDocChans[shard] = make( |
|
chan rankerAddDocReq, options.RankerBufLen) |
|
|
|
engine.rankerRankChans[shard] = make( |
|
chan rankerRankReq, options.RankerBufLen) |
|
|
|
engine.rankerRemoveDocChans[shard] = make( |
|
chan rankerRemoveDocReq, options.RankerBufLen) |
|
} |
|
} |
|
|
|
// InitStore initialize the persistent store channel |
|
func (engine *Engine) InitStore() { |
|
engine.storeIndexDocChans = make( |
|
[]chan storeIndexDocReq, engine.initOptions.StoreShards) |
|
|
|
for shard := 0; shard < engine.initOptions.StoreShards; shard++ { |
|
engine.storeIndexDocChans[shard] = make( |
|
chan storeIndexDocReq) |
|
} |
|
engine.storeInitChan = make( |
|
chan bool, engine.initOptions.StoreShards) |
|
} |
|
|
|
// CheckMem check the memory when the memory is larger |
|
// than 99.99% using the store |
|
func (engine *Engine) CheckMem() { |
|
// Todo test |
|
if !engine.initOptions.UseStore { |
|
log.Println("Check virtualMemory...") |
|
|
|
vmem, _ := mem.VirtualMemory() |
|
log.Printf("Total: %v, Free: %v, UsedPercent: %f%%\n", |
|
vmem.Total, vmem.Free, vmem.UsedPercent) |
|
|
|
useMem := fmt.Sprintf("%.2f", vmem.UsedPercent) |
|
if useMem == "99.99" { |
|
engine.initOptions.UseStore = true |
|
engine.initOptions.StoreFolder = DefaultPath |
|
// os.MkdirAll(DefaultPath, 0777) |
|
} |
|
} |
|
} |
|
|
|
// Store start the persistent store work connection |
|
func (engine *Engine) Store() { |
|
// if engine.initOptions.UseStore { |
|
err := os.MkdirAll(engine.initOptions.StoreFolder, 0700) |
|
if err != nil { |
|
log.Fatalf("Can not create directory: %s ; %v", |
|
engine.initOptions.StoreFolder, err) |
|
} |
|
|
|
// 打开或者创建数据库 |
|
engine.dbs = make([]store.Store, engine.initOptions.StoreShards) |
|
for shard := 0; shard < engine.initOptions.StoreShards; shard++ { |
|
dbPath := engine.initOptions.StoreFolder + "/" + |
|
StoreFilePrefix + "." + strconv.Itoa(shard) |
|
|
|
db, err := store.OpenStore(dbPath, engine.initOptions.StoreEngine) |
|
if db == nil || err != nil { |
|
log.Fatal("Unable to open database ", dbPath, ": ", err) |
|
} |
|
engine.dbs[shard] = db |
|
} |
|
|
|
// 从数据库中恢复 |
|
for shard := 0; shard < engine.initOptions.StoreShards; shard++ { |
|
go engine.storeInitWorker(shard) |
|
} |
|
|
|
// 等待恢复完成 |
|
for shard := 0; shard < engine.initOptions.StoreShards; shard++ { |
|
<-engine.storeInitChan |
|
} |
|
|
|
for { |
|
runtime.Gosched() |
|
|
|
engine.loc.RLock() |
|
numDoced := engine.numIndexingReqs == engine.numDocsIndexed |
|
engine.loc.RUnlock() |
|
|
|
if numDoced { |
|
break |
|
} |
|
|
|
} |
|
|
|
// 关闭并重新打开数据库 |
|
for shard := 0; shard < engine.initOptions.StoreShards; shard++ { |
|
engine.dbs[shard].Close() |
|
dbPath := engine.initOptions.StoreFolder + "/" + |
|
StoreFilePrefix + "." + strconv.Itoa(shard) |
|
|
|
db, err := store.OpenStore(dbPath, engine.initOptions.StoreEngine) |
|
if db == nil || err != nil { |
|
log.Fatal("Unable to open database ", dbPath, ": ", err) |
|
} |
|
engine.dbs[shard] = db |
|
} |
|
|
|
for shard := 0; shard < engine.initOptions.StoreShards; shard++ { |
|
go engine.storeIndexDocWorker(shard) |
|
} |
|
// } |
|
} |
|
|
|
// WithGse Using user defined segmenter |
|
// If using a not nil segmenter and the dictionary is loaded, |
|
// the `opt.GseDict` will be ignore. |
|
func (engine *Engine) WithGse(segmenter gse.Segmenter) *Engine { |
|
if engine.initialized { |
|
log.Fatal(`Do not re-initialize the engine, |
|
WithGse should call before initialize the engine.`) |
|
} |
|
|
|
engine.segmenter = segmenter |
|
engine.loaded = true |
|
return engine |
|
} |
|
|
|
// Init initialize the engine |
|
func (engine *Engine) Init(options types.EngineOpts) { |
|
// 将线程数设置为CPU数 |
|
// runtime.GOMAXPROCS(runtime.NumCPU()) |
|
// runtime.GOMAXPROCS(128) |
|
|
|
// 初始化初始参数 |
|
if engine.initialized { |
|
log.Fatal("Do not re-initialize the engine.") |
|
} |
|
|
|
if options.GseDict == "" && !options.NotUseGse && !engine.loaded { |
|
log.Printf("Dictionary file path is empty, load the default dictionary file.") |
|
options.GseDict = "zh" |
|
} |
|
|
|
if options.UseStore == true && options.StoreFolder == "" { |
|
log.Printf("Store file path is empty, use default folder path.") |
|
options.StoreFolder = DefaultPath |
|
// os.MkdirAll(DefaultPath, 0777) |
|
} |
|
|
|
options.Init() |
|
engine.initOptions = options |
|
engine.initialized = true |
|
|
|
if !options.NotUseGse { |
|
if !engine.loaded { |
|
// 载入分词器词典 |
|
engine.segmenter.LoadDict(options.GseDict) |
|
engine.loaded = true |
|
} |
|
|
|
// 初始化停用词 |
|
engine.stopTokens.Init(options.StopTokenFile) |
|
} |
|
|
|
// 初始化索引器和排序器 |
|
for shard := 0; shard < options.NumShards; shard++ { |
|
engine.indexers = append(engine.indexers, core.Indexer{}) |
|
engine.indexers[shard].Init(*options.IndexerOpts) |
|
|
|
engine.rankers = append(engine.rankers, core.Ranker{}) |
|
engine.rankers[shard].Init(options.IDOnly) |
|
} |
|
|
|
// 初始化分词器通道 |
|
engine.segmenterChan = make( |
|
chan segmenterReq, options.NumGseThreads) |
|
|
|
// 初始化索引器通道 |
|
engine.Indexer(options) |
|
|
|
// 初始化排序器通道 |
|
engine.Ranker(options) |
|
|
|
// engine.CheckMem(engine.initOptions.UseStore) |
|
engine.CheckMem() |
|
|
|
// 初始化持久化存储通道 |
|
if engine.initOptions.UseStore { |
|
engine.InitStore() |
|
} |
|
|
|
// 启动分词器 |
|
for iThread := 0; iThread < options.NumGseThreads; iThread++ { |
|
go engine.segmenterWorker() |
|
} |
|
|
|
// 启动索引器和排序器 |
|
for shard := 0; shard < options.NumShards; shard++ { |
|
go engine.indexerAddDocWorker(shard) |
|
go engine.indexerRemoveDocWorker(shard) |
|
go engine.rankerAddDocWorker(shard) |
|
go engine.rankerRemoveDocWorker(shard) |
|
|
|
for i := 0; i < options.NumIndexerThreadsPerShard; i++ { |
|
go engine.indexerLookupWorker(shard) |
|
} |
|
for i := 0; i < options.NumRankerThreadsPerShard; i++ { |
|
go engine.rankerRankWorker(shard) |
|
} |
|
} |
|
|
|
// 启动持久化存储工作协程 |
|
if engine.initOptions.UseStore { |
|
engine.Store() |
|
} |
|
|
|
atomic.AddUint64(&engine.numDocsStored, engine.numIndexingReqs) |
|
} |
|
|
|
// IndexDoc add the document to the index |
|
// 将文档加入索引 |
|
// |
|
// 输入参数: |
|
// docId 标识文档编号,必须唯一,docId == 0 表示非法文档(用于强制刷新索引),[1, +oo) 表示合法文档 |
|
// data 见 DocIndexData 注释 |
|
// forceUpdate 是否强制刷新 cache,如果设为 true,则尽快添加到索引,否则等待 cache 满之后一次全量添加 |
|
// |
|
// 注意: |
|
// 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度 |
|
// 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此 |
|
// 如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。 |
|
func (engine *Engine) IndexDoc(docId uint64, data types.DocData, |
|
forceUpdate ...bool) { |
|
engine.Index(docId, data, forceUpdate...) |
|
} |
|
|
|
// Index add the document to the index |
|
func (engine *Engine) Index(docId uint64, data types.DocData, |
|
forceUpdate ...bool) { |
|
|
|
var force bool |
|
if len(forceUpdate) > 0 { |
|
force = forceUpdate[0] |
|
} |
|
|
|
// if engine.HasDoc(docId) { |
|
// engine.RemoveDoc(docId) |
|
// } |
|
|
|
// data.Tokens |
|
engine.internalIndexDoc(docId, data, force) |
|
|
|
hash := murmur.Sum32(fmt.Sprintf("%d", docId)) % |
|
uint32(engine.initOptions.StoreShards) |
|
|
|
if engine.initOptions.UseStore && docId != 0 { |
|
engine.storeIndexDocChans[hash] <- storeIndexDocReq{ |
|
docId: docId, data: data} |
|
} |
|
} |
|
|
|
func (engine *Engine) internalIndexDoc(docId uint64, data types.DocData, |
|
forceUpdate bool) { |
|
|
|
if !engine.initialized { |
|
log.Fatal("The engine must be initialized first.") |
|
} |
|
|
|
if docId != 0 { |
|
atomic.AddUint64(&engine.numIndexingReqs, 1) |
|
} |
|
if forceUpdate { |
|
atomic.AddUint64(&engine.numForceUpdatingReqs, 1) |
|
} |
|
|
|
hash := murmur.Sum32(fmt.Sprintf("%d%s", docId, data.Content)) |
|
engine.segmenterChan <- segmenterReq{ |
|
docId: docId, hash: hash, data: data, forceUpdate: forceUpdate} |
|
} |
|
|
|
// RemoveDoc remove the document from the index |
|
// 将文档从索引中删除 |
|
// |
|
// 输入参数: |
|
// docId 标识文档编号,必须唯一,docId == 0 表示非法文档(用于强制刷新索引),[1, +oo) 表示合法文档 |
|
// forceUpdate 是否强制刷新 cache,如果设为 true,则尽快删除索引,否则等待 cache 满之后一次全量删除 |
|
// |
|
// 注意: |
|
// 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度 |
|
// 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此 |
|
// 如果立刻调用 Search 可能无法查询到这个文档。强制刷新索引请调用 FlushIndex 函数。 |
|
func (engine *Engine) RemoveDoc(docId uint64, forceUpdate ...bool) { |
|
var force bool |
|
if len(forceUpdate) > 0 { |
|
force = forceUpdate[0] |
|
} |
|
|
|
if !engine.initialized { |
|
log.Fatal("The engine must be initialized first.") |
|
} |
|
|
|
if docId != 0 { |
|
atomic.AddUint64(&engine.numRemovingReqs, 1) |
|
} |
|
if force { |
|
atomic.AddUint64(&engine.numForceUpdatingReqs, 1) |
|
} |
|
for shard := 0; shard < engine.initOptions.NumShards; shard++ { |
|
engine.indexerRemoveDocChans[shard] <- indexerRemoveDocReq{ |
|
docId: docId, forceUpdate: force} |
|
|
|
if docId == 0 { |
|
continue |
|
} |
|
engine.rankerRemoveDocChans[shard] <- rankerRemoveDocReq{docId: docId} |
|
} |
|
|
|
if engine.initOptions.UseStore && docId != 0 { |
|
// 从数据库中删除 |
|
hash := murmur.Sum32(fmt.Sprintf("%d", docId)) % |
|
uint32(engine.initOptions.StoreShards) |
|
|
|
go engine.storeRemoveDocWorker(docId, hash) |
|
} |
|
} |
|
|
|
// // 获取文本的分词结果 |
|
// func (engine *Engine) Tokens(text []byte) (tokens []string) { |
|
// querySegments := engine.segmenter.Segment(text) |
|
// for _, s := range querySegments { |
|
// token := s.Token().Text() |
|
// if !engine.stopTokens.IsStopToken(token) { |
|
// tokens = append(tokens, token) |
|
// } |
|
// } |
|
// return tokens |
|
// } |
|
|
|
// Segment get the word segmentation result of the text |
|
// 获取文本的分词结果, 只分词与过滤弃用词 |
|
func (engine *Engine) Segment(content string) (keywords []string) { |
|
segments := engine.segmenter.ModeSegment([]byte(content), |
|
engine.initOptions.GseMode) |
|
|
|
for _, segment := range segments { |
|
token := segment.Token().Text() |
|
if !engine.stopTokens.IsStopToken(token) { |
|
keywords = append(keywords, token) |
|
} |
|
} |
|
|
|
return |
|
} |
|
|
|
// Tokens get the engine tokens |
|
func (engine *Engine) Tokens(request types.SearchReq) (tokens []string) { |
|
// 收集关键词 |
|
// tokens := []string{} |
|
if request.Text != "" { |
|
request.Text = strings.ToLower(request.Text) |
|
if engine.initOptions.NotUseGse { |
|
tokens = strings.Split(request.Text, " ") |
|
} else { |
|
// querySegments := engine.segmenter.Segment([]byte(request.Text)) |
|
// tokens = engine.Tokens([]byte(request.Text)) |
|
tokens = engine.Segment(request.Text) |
|
} |
|
|
|
// 叠加 tokens |
|
for _, t := range request.Tokens { |
|
tokens = append(tokens, t) |
|
} |
|
|
|
return |
|
} |
|
|
|
for _, t := range request.Tokens { |
|
tokens = append(tokens, t) |
|
} |
|
return |
|
} |
|
|
|
// RankId rank docs by types.ScoredIDs |
|
func (engine *Engine) RankId(request types.SearchReq, RankOpts types.RankOpts, |
|
tokens []string, rankerReturnChan chan rankerReturnReq) ( |
|
output types.SearchResp) { |
|
// 从通信通道读取排序器的输出 |
|
numDocs := 0 |
|
var rankOutput types.ScoredIDs |
|
// var rankOutput interface{} |
|
|
|
//**********/ begin |
|
timeout := request.Timeout |
|
isTimeout := false |
|
if timeout <= 0 { |
|
// 不设置超时 |
|
for shard := 0; shard < engine.initOptions.NumShards; shard++ { |
|
rankerOutput := <-rankerReturnChan |
|
if !request.CountDocsOnly { |
|
if rankerOutput.docs != nil { |
|
for _, doc := range rankerOutput.docs.(types.ScoredIDs) { |
|
rankOutput = append(rankOutput, doc) |
|
} |
|
} |
|
} |
|
numDocs += rankerOutput.numDocs |
|
} |
|
} else { |
|
// 设置超时 |
|
deadline := time.Now().Add(time.Nanosecond * |
|
time.Duration(NumNanosecondsInAMillisecond*request.Timeout)) |
|
|
|
for shard := 0; shard < engine.initOptions.NumShards; shard++ { |
|
select { |
|
case rankerOutput := <-rankerReturnChan: |
|
if !request.CountDocsOnly { |
|
if rankerOutput.docs != nil { |
|
for _, doc := range rankerOutput.docs.(types.ScoredIDs) { |
|
rankOutput = append(rankOutput, doc) |
|
} |
|
} |
|
} |
|
numDocs += rankerOutput.numDocs |
|
case <-time.After(deadline.Sub(time.Now())): |
|
isTimeout = true |
|
break |
|
} |
|
} |
|
} |
|
|
|
// 再排序 |
|
if !request.CountDocsOnly && !request.Orderless { |
|
if RankOpts.ReverseOrder { |
|
sort.Sort(sort.Reverse(rankOutput)) |
|
} else { |
|
sort.Sort(rankOutput) |
|
} |
|
} |
|
|
|
// 准备输出 |
|
output.Tokens = tokens |
|
// 仅当 CountDocsOnly 为 false 时才充填 output.Docs |
|
if !request.CountDocsOnly { |
|
if request.Orderless { |
|
// 无序状态无需对 Offset 截断 |
|
output.Docs = rankOutput |
|
} else { |
|
var start, end int |
|
if RankOpts.MaxOutputs == 0 { |
|
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput)) |
|
end = len(rankOutput) |
|
} else { |
|
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput)) |
|
end = utils.MinInt(start+RankOpts.MaxOutputs, len(rankOutput)) |
|
} |
|
output.Docs = rankOutput[start:end] |
|
} |
|
} |
|
|
|
output.NumDocs = numDocs |
|
output.Timeout = isTimeout |
|
|
|
return |
|
} |
|
|
|
// Ranks rank docs by types.ScoredDocs |
|
func (engine *Engine) Ranks(request types.SearchReq, RankOpts types.RankOpts, |
|
tokens []string, rankerReturnChan chan rankerReturnReq) ( |
|
output types.SearchResp) { |
|
// 从通信通道读取排序器的输出 |
|
numDocs := 0 |
|
rankOutput := types.ScoredDocs{} |
|
|
|
//**********/ begin |
|
timeout := request.Timeout |
|
isTimeout := false |
|
if timeout <= 0 { |
|
// 不设置超时 |
|
for shard := 0; shard < engine.initOptions.NumShards; shard++ { |
|
rankerOutput := <-rankerReturnChan |
|
if !request.CountDocsOnly { |
|
if rankerOutput.docs != nil { |
|
for _, doc := range rankerOutput.docs.(types.ScoredDocs) { |
|
rankOutput = append(rankOutput, doc) |
|
} |
|
} |
|
} |
|
numDocs += rankerOutput.numDocs |
|
} |
|
} else { |
|
// 设置超时 |
|
deadline := time.Now().Add(time.Nanosecond * |
|
time.Duration(NumNanosecondsInAMillisecond*request.Timeout)) |
|
|
|
for shard := 0; shard < engine.initOptions.NumShards; shard++ { |
|
select { |
|
case rankerOutput := <-rankerReturnChan: |
|
if !request.CountDocsOnly { |
|
if rankerOutput.docs != nil { |
|
for _, doc := range rankerOutput.docs.(types.ScoredDocs) { |
|
rankOutput = append(rankOutput, doc) |
|
} |
|
} |
|
} |
|
numDocs += rankerOutput.numDocs |
|
case <-time.After(deadline.Sub(time.Now())): |
|
isTimeout = true |
|
break |
|
} |
|
} |
|
} |
|
|
|
// 再排序 |
|
if !request.CountDocsOnly && !request.Orderless { |
|
if RankOpts.ReverseOrder { |
|
sort.Sort(sort.Reverse(rankOutput)) |
|
} else { |
|
sort.Sort(rankOutput) |
|
} |
|
} |
|
|
|
// 准备输出 |
|
output.Tokens = tokens |
|
// 仅当 CountDocsOnly 为 false 时才充填 output.Docs |
|
if !request.CountDocsOnly { |
|
if request.Orderless { |
|
// 无序状态无需对 Offset 截断 |
|
output.Docs = rankOutput |
|
} else { |
|
var start, end int |
|
if RankOpts.MaxOutputs == 0 { |
|
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput)) |
|
end = len(rankOutput) |
|
} else { |
|
start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput)) |
|
end = utils.MinInt(start+RankOpts.MaxOutputs, len(rankOutput)) |
|
} |
|
output.Docs = rankOutput[start:end] |
|
} |
|
} |
|
|
|
output.NumDocs = numDocs |
|
output.Timeout = isTimeout |
|
|
|
return |
|
} |
|
|
|
// Search find the document that satisfies the search criteria. |
|
// This function is thread safe |
|
// 查找满足搜索条件的文档,此函数线程安全 |
|
func (engine *Engine) Search(request types.SearchReq) (output types.SearchResp) { |
|
if !engine.initialized { |
|
log.Fatal("The engine must be initialized first.") |
|
} |
|
|
|
tokens := engine.Tokens(request) |
|
|
|
var RankOpts types.RankOpts |
|
if request.RankOpts == nil { |
|
RankOpts = *engine.initOptions.DefaultRankOpts |
|
} else { |
|
RankOpts = *request.RankOpts |
|
} |
|
if RankOpts.ScoringCriteria == nil { |
|
RankOpts.ScoringCriteria = engine.initOptions.DefaultRankOpts.ScoringCriteria |
|
} |
|
|
|
// 建立排序器返回的通信通道 |
|
rankerReturnChan := make( |
|
chan rankerReturnReq, engine.initOptions.NumShards) |
|
|
|
// 生成查找请求 |
|
lookupRequest := indexerLookupReq{ |
|
countDocsOnly: request.CountDocsOnly, |
|
tokens: tokens, |
|
labels: request.Labels, |
|
docIds: request.DocIds, |
|
options: RankOpts, |
|
rankerReturnChan: rankerReturnChan, |
|
orderless: request.Orderless, |
|
logic: request.Logic, |
|
} |
|
|
|
// 向索引器发送查找请求 |
|
for shard := 0; shard < engine.initOptions.NumShards; shard++ { |
|
engine.indexerLookupChans[shard] <- lookupRequest |
|
} |
|
|
|
if engine.initOptions.IDOnly { |
|
output = engine.RankId(request, RankOpts, tokens, rankerReturnChan) |
|
return |
|
} |
|
|
|
output = engine.Ranks(request, RankOpts, tokens, rankerReturnChan) |
|
return |
|
} |
|
|
|
// Flush block wait until all indexes are added |
|
// 阻塞等待直到所有索引添加完毕 |
|
func (engine *Engine) Flush() { |
|
for { |
|
runtime.Gosched() |
|
|
|
engine.loc.RLock() |
|
inxd := engine.numIndexingReqs == engine.numDocsIndexed |
|
rmd := engine.numRemovingReqs*uint64(engine.initOptions.NumShards) == |
|
engine.numDocsRemoved |
|
stored := !engine.initOptions.UseStore || engine.numIndexingReqs == |
|
engine.numDocsStored |
|
engine.loc.RUnlock() |
|
|
|
if inxd && rmd && stored { |
|
// 保证 CHANNEL 中 REQUESTS 全部被执行完 |
|
break |
|
} |
|
} |
|
|
|
// 强制更新,保证其为最后的请求 |
|
engine.IndexDoc(0, types.DocData{}, true) |
|
for { |
|
runtime.Gosched() |
|
|
|
engine.loc.RLock() |
|
forced := engine.numForceUpdatingReqs*uint64(engine.initOptions.NumShards) == |
|
engine.numDocsForceUpdated |
|
engine.loc.RUnlock() |
|
|
|
if forced { |
|
return |
|
} |
|
|
|
} |
|
} |
|
|
|
// FlushIndex block wait until all indexes are added |
|
// 阻塞等待直到所有索引添加完毕 |
|
func (engine *Engine) FlushIndex() { |
|
engine.Flush() |
|
} |
|
|
|
// Close close the engine |
|
// 关闭引擎 |
|
func (engine *Engine) Close() { |
|
engine.Flush() |
|
if engine.initOptions.UseStore { |
|
for _, db := range engine.dbs { |
|
db.Close() |
|
} |
|
} |
|
} |
|
|
|
// 从文本hash得到要分配到的 shard |
|
func (engine *Engine) getShard(hash uint32) int { |
|
return int(hash - hash/uint32(engine.initOptions.NumShards)* |
|
uint32(engine.initOptions.NumShards)) |
|
}
|
|
|