From 644fb09806c7eb549fe24c81d4030c9bc76563ed Mon Sep 17 00:00:00 2001 From: Alexandre Tuleu Date: Fri, 22 Jan 2016 18:19:24 +0100 Subject: [PATCH] Adds a BatchSize option --- main.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 5f1662e..0a3c12d 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ type Options struct { MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"` RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"` CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" ` + BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"100"` } type appData struct { @@ -53,6 +54,9 @@ type appData struct { // } func newAppData(opts Options) (*appData, error) { + if opts.BatchSize <= 0 { + return nil, fmt.Errorf("Invalid --batch-size of %d, need to be >0", opts.BatchSize) + } basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite") err := os.MkdirAll(basepath, 0755) if err != nil { @@ -113,6 +117,7 @@ func (a *appData) readCsv(stopChan <-chan struct{}, a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) return } + log.Printf("%d albums in cache", len(allAlbums)) for _, aID := range allAlbums { shouldDelete[aID] = true @@ -131,17 +136,21 @@ func (a *appData) readCsv(stopChan <-chan struct{}, return } - for { + for nbAlbum := 0; true; nbAlbum++ { album, err := csvReader.Read() if err != nil { if err != io.EOF { a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) + nbAlbum-- continue } //since we have parsed the whole database, we are safe to delete extra saved albums! safeToDelete = true return } + if nbAlbum%a.opts.BatchSize == 0 { + log.Printf("Parsed %d albums", nbAlbum) + } select { case <-stopChan: return @@ -177,7 +186,7 @@ func (a *appData) indexAlbum(stopChan <-chan struct{}, case album, ok := <-albumToIndexBatch: if ok == false { albumToIndexBatch = nil - if count%1000 != 0 { + if count%a.opts.BatchSize != 0 { err := a.index.Batch(batch) if err != nil { a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) @@ -191,7 +200,7 @@ func (a *appData) indexAlbum(stopChan <-chan struct{}, break } count++ - if count%1000 == 0 { + if count%a.opts.BatchSize == 0 { err := a.index.Batch(batch) if err != nil { a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) @@ -333,7 +342,7 @@ func (a *appData) logErrors() { } func (a *appData) start(stopChan <-chan struct{}) { - newAlbumsFromCsv := make(chan *Album) + newAlbumsFromCsv := make(chan *Album, 1000) deletedAlbumsFromCsv := make(chan AlbumID) a.wg.Add(4) go a.logErrors()