Plays with logging

This commit is contained in:
2016-01-22 18:48:12 +01:00
parent 644fb09806
commit 6ef506b0ea

93
main.go
View File

@@ -26,7 +26,7 @@ type Options struct {
MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"`
RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"`
CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" `
BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"100"`
BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"1000"`
}
type appData struct {
@@ -136,21 +136,17 @@ func (a *appData) readCsv(stopChan <-chan struct{},
return
}
for nbAlbum := 0; true; nbAlbum++ {
for {
album, err := csvReader.Read()
if err != nil {
if err != io.EOF {
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
nbAlbum--
continue
}
//since we have parsed the whole database, we are safe to delete extra saved albums!
safeToDelete = true
return
}
if nbAlbum%a.opts.BatchSize == 0 {
log.Printf("Parsed %d albums", nbAlbum)
}
select {
case <-stopChan:
return
@@ -161,6 +157,18 @@ func (a *appData) readCsv(stopChan <-chan struct{},
}
}
func (a *appData) submitBatchToIndex(batch *bleve.Batch) {
s := batch.Size()
log.Printf("[INDEX] start indexing of %d albums", s)
err := a.index.Batch(batch)
start := time.Now()
if err != nil {
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
return
}
log.Printf("[INDEX] indexed %d albums in %s", s, time.Since(start))
}
func (a *appData) indexAlbum(stopChan <-chan struct{},
albumToIndexBatch <-chan *Album,
albumToReIndex <-chan *Album,
@@ -174,6 +182,24 @@ func (a *appData) indexAlbum(stopChan <-chan struct{},
select {
case <-stopChan:
return
case album, ok := <-albumToIndexBatch:
if ok == false {
albumToIndexBatch = nil
if count%a.opts.BatchSize != 0 {
a.submitBatchToIndex(batch)
break
}
}
err := batch.Index(AlbumIDString(album.ID), a)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %d: %s", album.ID, err)
break
}
count++
if count%a.opts.BatchSize == 0 {
a.submitBatchToIndex(batch)
batch = a.index.NewBatch()
}
case album, ok := <-albumToReIndex:
if ok == false {
albumToReIndex = nil
@@ -183,37 +209,10 @@ func (a *appData) indexAlbum(stopChan <-chan struct{},
if err != nil {
a.errors <- fmt.Errorf("[INDEX] re-indexing %d failed: %s", album.ID, err)
}
case album, ok := <-albumToIndexBatch:
if ok == false {
albumToIndexBatch = nil
if count%a.opts.BatchSize != 0 {
err := a.index.Batch(batch)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
} else {
log.Printf("[INDEX] indexed %d albums", count)
}
}
err := batch.Index(AlbumIDString(album.ID), a)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %d: %s", album.ID, err)
break
}
count++
if count%a.opts.BatchSize == 0 {
err := a.index.Batch(batch)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
} else {
log.Printf("[INDEX] indexed %d albums", count)
}
batch = a.index.NewBatch()
}
break
}
case aID, ok := <-albumToDelete:
if ok == false {
albumToDelete = nil
break
}
err := a.index.Delete(AlbumIDString(aID))
if err != nil {
@@ -225,7 +224,9 @@ func (a *appData) indexAlbum(stopChan <-chan struct{},
}
func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- *Album) {
nbAlbums := 0
for album := range getAlbum {
nbAlbums++
albumCache, err := a.db.Get(album.ID)
if err == nil {
toIndex <- albumCache
@@ -237,17 +238,17 @@ func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- *
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: getting %d :%s", album.ID, err)
<-a.dbLock
return
continue
}
err = a.db.AddOrUpdate(album)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: storing %d: %s", album.ID, err)
return
continue
}
toIndex <- album
}
log.Printf("[CACHE ALBUMS] %d albums cached", nbAlbums)
}
func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{},
@@ -263,16 +264,18 @@ func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{},
case aID, ok := <-deleteAlbum:
if ok == false {
deleteAlbum = nil
break
}
a.dbLock <- true
err := a.db.Delete(aID)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: delete %d: %s", aID, err)
a.errors <- fmt.Errorf("[DB]: delete %d: %s", aID, err)
}
case aID, ok := <-checkAlbum:
if ok == false {
checkAlbum = nil
break
}
a.dbLock <- true
@@ -280,20 +283,20 @@ func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{},
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: check %d: %s", aID, err)
return
a.errors <- fmt.Errorf("[DB]: check %d: %s", aID, err)
continue
}
if time.Now().Add(-a.opts.CacheTTL).Before(cachedAlbum.FetchDate) == true {
// cached value is good
return
continue
}
// thsi could be very long
err = a.getter.Get(cachedAlbum)
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err)
return
a.errors <- fmt.Errorf("[DB]: GET %d: %s", aID, err)
continue
}
// re-lock the db
@@ -301,8 +304,8 @@ func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{},
err = a.db.AddOrUpdate(cachedAlbum)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err)
return
a.errors <- fmt.Errorf("[DB]: could not add new %d: %s", aID, err)
continue
}
updateIndex <- cachedAlbum
}