diff --git a/main.go b/main.go index 0a3c12d..41782b1 100644 --- a/main.go +++ b/main.go @@ -26,7 +26,7 @@ type Options struct { MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"` RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"` CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" ` - BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"100"` + BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"1000"` } type appData struct { @@ -136,21 +136,17 @@ func (a *appData) readCsv(stopChan <-chan struct{}, return } - for nbAlbum := 0; true; nbAlbum++ { + for { album, err := csvReader.Read() if err != nil { if err != io.EOF { a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) - nbAlbum-- continue } //since we have parsed the whole database, we are safe to delete extra saved albums! safeToDelete = true return } - if nbAlbum%a.opts.BatchSize == 0 { - log.Printf("Parsed %d albums", nbAlbum) - } select { case <-stopChan: return @@ -161,6 +157,18 @@ func (a *appData) readCsv(stopChan <-chan struct{}, } } +func (a *appData) submitBatchToIndex(batch *bleve.Batch) { + s := batch.Size() + log.Printf("[INDEX] start indexing of %d albums", s) + err := a.index.Batch(batch) + start := time.Now() + if err != nil { + a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) + return + } + log.Printf("[INDEX] indexed %d albums in %s", s, time.Since(start)) +} + func (a *appData) indexAlbum(stopChan <-chan struct{}, albumToIndexBatch <-chan *Album, albumToReIndex <-chan *Album, @@ -174,6 +182,24 @@ func (a *appData) indexAlbum(stopChan <-chan struct{}, select { case <-stopChan: return + case album, ok := <-albumToIndexBatch: + if ok == false { + albumToIndexBatch = nil + if count%a.opts.BatchSize != 0 { + a.submitBatchToIndex(batch) + break + } + } + err := batch.Index(AlbumIDString(album.ID), a) + if err != nil { + a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %d: %s", album.ID, err) + break + } + count++ + if count%a.opts.BatchSize == 0 { + a.submitBatchToIndex(batch) + batch = a.index.NewBatch() + } case album, ok := <-albumToReIndex: if ok == false { albumToReIndex = nil @@ -183,37 +209,10 @@ func (a *appData) indexAlbum(stopChan <-chan struct{}, if err != nil { a.errors <- fmt.Errorf("[INDEX] re-indexing %d failed: %s", album.ID, err) } - case album, ok := <-albumToIndexBatch: - if ok == false { - albumToIndexBatch = nil - if count%a.opts.BatchSize != 0 { - err := a.index.Batch(batch) - if err != nil { - a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) - } else { - log.Printf("[INDEX] indexed %d albums", count) - } - } - err := batch.Index(AlbumIDString(album.ID), a) - if err != nil { - a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %d: %s", album.ID, err) - break - } - count++ - if count%a.opts.BatchSize == 0 { - err := a.index.Batch(batch) - if err != nil { - a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) - } else { - log.Printf("[INDEX] indexed %d albums", count) - } - batch = a.index.NewBatch() - } - break - } case aID, ok := <-albumToDelete: if ok == false { albumToDelete = nil + break } err := a.index.Delete(AlbumIDString(aID)) if err != nil { @@ -225,7 +224,9 @@ func (a *appData) indexAlbum(stopChan <-chan struct{}, } func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- *Album) { + nbAlbums := 0 for album := range getAlbum { + nbAlbums++ albumCache, err := a.db.Get(album.ID) if err == nil { toIndex <- albumCache @@ -237,17 +238,17 @@ func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- * if err != nil { a.errors <- fmt.Errorf("[CACHE ALBUMS]: getting %d :%s", album.ID, err) <-a.dbLock - return + continue } err = a.db.AddOrUpdate(album) <-a.dbLock if err != nil { a.errors <- fmt.Errorf("[CACHE ALBUMS]: storing %d: %s", album.ID, err) - return + continue } toIndex <- album - } + log.Printf("[CACHE ALBUMS] %d albums cached", nbAlbums) } func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{}, @@ -263,16 +264,18 @@ func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{}, case aID, ok := <-deleteAlbum: if ok == false { deleteAlbum = nil + break } a.dbLock <- true err := a.db.Delete(aID) <-a.dbLock if err != nil { - a.errors <- fmt.Errorf("[CACHE ALBUMS]: delete %d: %s", aID, err) + a.errors <- fmt.Errorf("[DB]: delete %d: %s", aID, err) } case aID, ok := <-checkAlbum: if ok == false { checkAlbum = nil + break } a.dbLock <- true @@ -280,20 +283,20 @@ func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{}, <-a.dbLock if err != nil { - a.errors <- fmt.Errorf("[CACHE ALBUMS]: check %d: %s", aID, err) - return + a.errors <- fmt.Errorf("[DB]: check %d: %s", aID, err) + continue } if time.Now().Add(-a.opts.CacheTTL).Before(cachedAlbum.FetchDate) == true { // cached value is good - return + continue } // thsi could be very long err = a.getter.Get(cachedAlbum) if err != nil { - a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err) - return + a.errors <- fmt.Errorf("[DB]: GET %d: %s", aID, err) + continue } // re-lock the db @@ -301,8 +304,8 @@ func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{}, err = a.db.AddOrUpdate(cachedAlbum) <-a.dbLock if err != nil { - a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err) - return + a.errors <- fmt.Errorf("[DB]: could not add new %d: %s", aID, err) + continue } updateIndex <- cachedAlbum }