You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
68 lines
1.5 KiB
68 lines
1.5 KiB
package service |
|
|
|
import ( |
|
"context" |
|
|
|
"go-common/app/job/main/search/model" |
|
"go-common/library/log" |
|
) |
|
|
|
// all all data to es |
|
func (s *Service) all(c context.Context, appid string, writeEntityIndex bool) { |
|
var stat = new(model.Stat) |
|
app := s.base.D.AppPool[appid] |
|
app.InitIndex(c) |
|
app.InitOffset(c) |
|
//app.Offset(c) |
|
app.Sleep(c) |
|
for { |
|
start := 0 |
|
length, err := app.AllMessages(c) |
|
if err != nil { |
|
log.Error("AllMessages error(%v)", err) |
|
app.Sleep(c) |
|
continue |
|
} |
|
for { |
|
end := start + _bulkSize |
|
diff := length - start |
|
if diff > _bulkSize { |
|
if err := app.BulkIndex(c, start, end, writeEntityIndex); err != nil { |
|
log.Error("es:BulkIndex error(%v)", err) |
|
app.Sleep(c) |
|
continue |
|
} |
|
start = end |
|
} else if diff > 0 && diff <= _bulkSize { |
|
if err := app.BulkIndex(c, start, length, writeEntityIndex); err != nil { |
|
log.Error("BulkIndex error(%v)", err) |
|
app.Sleep(c) |
|
continue |
|
} |
|
if err := app.Commit(c); err != nil { |
|
log.Error("UpdateOffsetID error(%v)", err) |
|
app.Sleep(c) |
|
continue |
|
} |
|
app.Sleep(c) |
|
break |
|
} else { |
|
app.Sleep(c) |
|
break |
|
} |
|
} |
|
stat.Counts += length |
|
s.updateStat(appid, stat) |
|
if length < app.Size(c) { |
|
switch appid { |
|
case "pgc_media", "esports", "esports_contests", "academy_archive", "esports_fav_all", "activity_all": |
|
app.SetRecover(c, 0, "", 0) |
|
app.Sleep(c) |
|
continue |
|
} |
|
break |
|
} |
|
app.Sleep(c) |
|
} |
|
log.Info("appid:%s, all data to es successful!!!", appid) |
|
}
|
|
|