Adds a BatchSize option
This commit is contained in:
17
main.go
17
main.go
@@ -26,6 +26,7 @@ type Options struct {
|
|||||||
MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"`
|
MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"`
|
||||||
RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"`
|
RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"`
|
||||||
CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" `
|
CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" `
|
||||||
|
BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"100"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type appData struct {
|
type appData struct {
|
||||||
@@ -53,6 +54,9 @@ type appData struct {
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
func newAppData(opts Options) (*appData, error) {
|
func newAppData(opts Options) (*appData, error) {
|
||||||
|
if opts.BatchSize <= 0 {
|
||||||
|
return nil, fmt.Errorf("Invalid --batch-size of %d, need to be >0", opts.BatchSize)
|
||||||
|
}
|
||||||
basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite")
|
basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite")
|
||||||
err := os.MkdirAll(basepath, 0755)
|
err := os.MkdirAll(basepath, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -113,6 +117,7 @@ func (a *appData) readCsv(stopChan <-chan struct{},
|
|||||||
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
|
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.Printf("%d albums in cache", len(allAlbums))
|
||||||
|
|
||||||
for _, aID := range allAlbums {
|
for _, aID := range allAlbums {
|
||||||
shouldDelete[aID] = true
|
shouldDelete[aID] = true
|
||||||
@@ -131,17 +136,21 @@ func (a *appData) readCsv(stopChan <-chan struct{},
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for nbAlbum := 0; true; nbAlbum++ {
|
||||||
album, err := csvReader.Read()
|
album, err := csvReader.Read()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
|
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
|
||||||
|
nbAlbum--
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//since we have parsed the whole database, we are safe to delete extra saved albums!
|
//since we have parsed the whole database, we are safe to delete extra saved albums!
|
||||||
safeToDelete = true
|
safeToDelete = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if nbAlbum%a.opts.BatchSize == 0 {
|
||||||
|
log.Printf("Parsed %d albums", nbAlbum)
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-stopChan:
|
case <-stopChan:
|
||||||
return
|
return
|
||||||
@@ -177,7 +186,7 @@ func (a *appData) indexAlbum(stopChan <-chan struct{},
|
|||||||
case album, ok := <-albumToIndexBatch:
|
case album, ok := <-albumToIndexBatch:
|
||||||
if ok == false {
|
if ok == false {
|
||||||
albumToIndexBatch = nil
|
albumToIndexBatch = nil
|
||||||
if count%1000 != 0 {
|
if count%a.opts.BatchSize != 0 {
|
||||||
err := a.index.Batch(batch)
|
err := a.index.Batch(batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
|
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
|
||||||
@@ -191,7 +200,7 @@ func (a *appData) indexAlbum(stopChan <-chan struct{},
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
if count%1000 == 0 {
|
if count%a.opts.BatchSize == 0 {
|
||||||
err := a.index.Batch(batch)
|
err := a.index.Batch(batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
|
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
|
||||||
@@ -333,7 +342,7 @@ func (a *appData) logErrors() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *appData) start(stopChan <-chan struct{}) {
|
func (a *appData) start(stopChan <-chan struct{}) {
|
||||||
newAlbumsFromCsv := make(chan *Album)
|
newAlbumsFromCsv := make(chan *Album, 1000)
|
||||||
deletedAlbumsFromCsv := make(chan AlbumID)
|
deletedAlbumsFromCsv := make(chan AlbumID)
|
||||||
a.wg.Add(4)
|
a.wg.Add(4)
|
||||||
go a.logErrors()
|
go a.logErrors()
|
||||||
|
|||||||
Reference in New Issue
Block a user