package main import ( "fmt" "io" "log" "math/rand" "net" "net/http" "os" "path/filepath" "sync" "time" "launchpad.net/go-xdg/v0" "github.com/jessevdk/go-flags" "github.com/tylerb/graceful" "github.com/blevesearch/bleve" ) type Options struct { CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"` Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"` MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"` RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"` CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" ` BatchSize int `long:"batch-size" description:"Sizes of the batch for indexing" default:"1000"` } type appData struct { index bleve.Index db *AlbumDatabase getter *AlbumDescriptionGetter cover *AlbumCoverCache opts Options errors chan error wg sync.WaitGroup } func newAppData(opts Options) (*appData, error) { if opts.BatchSize <= 0 { return nil, fmt.Errorf("Invalid --batch-size of %d, need to be >0", opts.BatchSize) } basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite") err := os.MkdirAll(basepath, 0755) if err != nil { return nil, err } err = os.MkdirAll(filepath.Join("tmp", "log"), 0755) if err != nil { return nil, err } res := &appData{ opts: opts, errors: make(chan error), } blevePath := filepath.Join(basepath, "index") res.index, err = bleve.Open(blevePath) if err == bleve.ErrorIndexPathDoesNotExist { log.Printf("Creating a new index in %s", blevePath) res.index, err = bleve.New(blevePath, buildAlbumMapping()) if err != nil { return nil, err } } if err != nil { return nil, err } res.db = OpenAlbumDatabase(filepath.Join(basepath, "db")) res.getter = &AlbumDescriptionGetter{ getter: NewRateLimitedGetter(10, 10), } res.cover = NewAlbumCoverCache(filepath.Join(basepath, "covers"), opts.MaxRequests, opts.RequestWindow) //makes them both use the same HTTPGetter res.cover.getter = res.getter.getter return res, nil } func (a *appData) readCsv(stopChan <-chan struct{}, newAlbums chan<- *Album, deletedAlbum chan<- AlbumID) { defer close(newAlbums) defer a.wg.Done() // defered call : we remove album that are not listed anymore in the CSV file safeToDelete := false shouldDelete := map[AlbumID]bool{} defer func() { if safeToDelete == true { for deletedID, _ := range shouldDelete { deletedAlbum <- deletedID } } close(deletedAlbum) }() allAlbums, err := a.db.ByPurchaseDate() if err != nil { a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) return } log.Printf("%d albums in cache", len(allAlbums)) for _, aID := range allAlbums { shouldDelete[aID] = true } f, err := os.Open(a.opts.CsvFile) if err != nil { a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) return } defer closeOrPanic(f, a.opts.CsvFile) csvReader, err := NewAlbumCsvReader(f) if err != nil { a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) return } for { album, err := csvReader.Read() if err != nil { if err != io.EOF { a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) continue } //since we have parsed the whole database, we are safe to delete extra saved albums! safeToDelete = true return } select { case <-stopChan: return case newAlbums <- album: // then we should not delete it delete(shouldDelete, album.ID) } } } func (a *appData) submitBatchToIndex(batch *bleve.Batch) { s := batch.Size() log.Printf("[INDEX] start indexing of %d albums", s) start := time.Now() err := a.index.Batch(batch) if err != nil { a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) return } log.Printf("[INDEX] indexed %d albums in %s", s, time.Since(start)) } func (a *appData) indexAlbum(stopChan <-chan struct{}, albumToIndexBatch <-chan *Album, albumToReIndex <-chan *Album, albumToDelete <-chan AlbumID, deletedAlbum chan<- AlbumID) { defer close(deletedAlbum) defer a.wg.Done() batch := a.index.NewBatch() count := 0 for { select { case <-stopChan: return case album, ok := <-albumToIndexBatch: if ok == false { albumToIndexBatch = nil if count%a.opts.BatchSize != 0 { a.submitBatchToIndex(batch) break } } err := batch.Index(AlbumIDString(album.ID), album) if err != nil { a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %s: %s", album, err) break } else { count++ } if count%a.opts.BatchSize == 0 { a.submitBatchToIndex(batch) batch = a.index.NewBatch() } case album, ok := <-albumToReIndex: if ok == false { albumToReIndex = nil break } err := a.index.Index(AlbumIDString(album.ID), album) if err != nil { a.errors <- fmt.Errorf("[INDEX] re-indexing %s failed: %s", album, err) } case aID, ok := <-albumToDelete: if ok == false { albumToDelete = nil break } err := a.index.Delete(AlbumIDString(aID)) if err != nil { a.errors <- fmt.Errorf("[INDEX]: delete failed: %s", err) } } } } func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- *Album) { defer close(toIndex) nbAlbums := 0 for album := range getAlbum { nbAlbums++ albumCache, err := a.db.Get(album.ID) if err == nil { a.cover.RegisterCover(albumCache.ID, albumCache.CoverURL) toIndex <- albumCache continue } err = a.getter.Get(album) if err != nil { a.errors <- fmt.Errorf("[CACHE ALBUMS]: getting %s :%s", album, err) continue } err = a.db.AddOrUpdate(album) if err != nil { a.errors <- fmt.Errorf("[CACHE ALBUMS]: storing %s: %s", album, err) continue } a.cover.RegisterCover(album.ID, album.CoverURL) toIndex <- album } log.Printf("[CACHE ALBUMS] %d albums cached", nbAlbums) } func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{}, checkAlbum <-chan AlbumID, deleteAlbum <-chan AlbumID, updateIndex chan<- *Album) { defer close(updateIndex) defer a.wg.Done() for { select { case <-stopChan: return case aID, ok := <-checkAlbum: if ok == false { checkAlbum = nil break } cachedAlbum, err := a.db.Get(aID) if err != nil { a.errors <- fmt.Errorf("[DB]: check %d: %s", aID, err) continue } if time.Now().Add(-a.opts.CacheTTL).Before(cachedAlbum.FetchDate) == true { // cached value is good continue } // thsi could be very long err = a.getter.Get(cachedAlbum) if err != nil { a.errors <- fmt.Errorf("[DB]: GET %d: %s", aID, err) continue } // re-lock the db err = a.db.AddOrUpdate(cachedAlbum) if err != nil { a.errors <- fmt.Errorf("[DB]: could not add new %d: %s", aID, err) continue } updateIndex <- cachedAlbum case aID, ok := <-deleteAlbum: if ok == false { deleteAlbum = nil break } err := a.db.Delete(aID) if err != nil { a.errors <- fmt.Errorf("[DB]: delete %d: %s", aID, err) } } } } func (a *appData) updateCache(stopChan <-chan struct{}, periode time.Duration, checkAlbum chan<- AlbumID) { defer close(checkAlbum) defer a.wg.Done() ticker := time.NewTicker(periode) for { select { case <-stopChan: return case <-ticker.C: //we just chek albums, err := a.db.ByPurchaseDate() if err != nil { a.errors <- fmt.Errorf("[UPDATER]: could not get albums: %s", err) continue } for i := 0; i < int(periode.Minutes()); i++ { checkAlbum <- albums[rand.Intn(len(albums))] } } } } func (a *appData) logErrors() { for e := range a.errors { log.Printf("[errors] ]%s", e) } } func (a *appData) start(stopChan <-chan struct{}) { newAlbumsFromCsv := make(chan *Album, 1000) deletedAlbumsFromCsv := make(chan AlbumID) a.wg.Add(4) go a.logErrors() go a.readCsv(stopChan, newAlbumsFromCsv, deletedAlbumsFromCsv) albumToBatchIndex := make(chan *Album) go a.cacheAlbumDescription(newAlbumsFromCsv, albumToBatchIndex) albumToReIndex := make(chan *Album) albumToDelete := make(chan AlbumID) go a.indexAlbum(stopChan, albumToBatchIndex, albumToReIndex, deletedAlbumsFromCsv, albumToDelete) albumToCheck := make(chan AlbumID) go a.maintainAlbumDatabase(stopChan, albumToCheck, albumToDelete, albumToReIndex) go a.updateCache(stopChan, 10*time.Minute, albumToCheck) } func (a *appData) wait() { a.wg.Wait() } // execute executes the job func Execute() error { opts := Options{ CacheTTL: 31 * 24 * time.Hour, MaxRequests: 10, RequestWindow: 10 * time.Second, } _, err := flags.Parse(&opts) if err != nil { return err } a, err := newAppData(opts) if err != nil { return err } srv := &graceful.Server{ Timeout: 10 * time.Second, Server: &http.Server{ Addr: opts.Listen, }, } log.Printf("Listening on %s", opts.Listen) a.start(srv.StopChan()) srv.Server.Handler = a.buildRouter() if err := srv.ListenAndServe(); err != nil { if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") { return err } } return nil } func main() { if err := Execute(); err != nil { log.Printf("got unhandled error: %s", err) os.Exit(1) } }