From f407c1cf3ea0ffa366658a2a2ede48fe311e5089 Mon Sep 17 00:00:00 2001 From: Alexandre Tuleu Date: Fri, 22 Jan 2016 18:06:33 +0100 Subject: [PATCH] Implements a basic main function --- Makefile | 2 + album_cover_cache.go | 5 +- album_csv_reader.go | 15 +- main.go | 395 ++++++++++++++++++++++++++++++++++++------- router.go | 78 +++++++++ utils.go | 3 +- 6 files changed, 431 insertions(+), 67 deletions(-) create mode 100644 router.go diff --git a/Makefile b/Makefile index 7001ec4..ed5142c 100644 --- a/Makefile +++ b/Makefile @@ -12,4 +12,6 @@ endif check : go vet go test $(short_option) -coverprofile=cover.out -covermode=count + errcheck -abspath golint + diff --git a/album_cover_cache.go b/album_cover_cache.go index 06f813c..a41de71 100644 --- a/album_cover_cache.go +++ b/album_cover_cache.go @@ -45,7 +45,10 @@ func (c *AlbumCoverCache) fetch(a *Album) (io.ReadCloser, error) { } mdbyte, err := json.Marshal(&md) if err == nil { - c.dv.Write(c.metadataKey(a), mdbyte) + err = c.dv.Write(c.metadataKey(a), mdbyte) + if err != nil { + return nil, err + } } return c.dv.ReadStream(c.dataKey(a), false) diff --git a/album_csv_reader.go b/album_csv_reader.go index fcd4d00..df7c0ec 100644 --- a/album_csv_reader.go +++ b/album_csv_reader.go @@ -118,8 +118,14 @@ func (r *AlbumCsvReader) Read() (*Album, error) { res.Num = int(n) } - safeParseTime("01/2006", data[r.columns[cLegalDeposit]], &(res.LegalDeposit)) - safeParseTime("01/2006", data[r.columns[cPrintDate]], &(res.PrintDate)) + err = safeParseTime("01/2006", data[r.columns[cLegalDeposit]], &(res.LegalDeposit)) + if err != nil { + return nil, fmt.Errorf("AlbumCsvReader: %s: %s", cLegalDeposit, err) + } + err = safeParseTime("01/2006", data[r.columns[cPrintDate]], &(res.PrintDate)) + if err != nil { + return nil, fmt.Errorf("AlbumCsvReader: %s: %s", cPrintDate, err) + } state, err := strconv.ParseInt(data[r.columns[cState]], 0, 32) if err != nil { @@ -127,7 +133,10 @@ func (r *AlbumCsvReader) Read() (*Album, error) { } res.State = AlbumState(state) - safeParseTime("02/01/2006", data[r.columns[cPurchaseDate]], &(res.PurchaseDate)) + err = safeParseTime("02/01/2006", data[r.columns[cPurchaseDate]], &(res.PurchaseDate)) + if err != nil { + return nil, fmt.Errorf("AlbumCsvReader: %s: %s", cPurchaseDate, err) + } res.SatID = fmt.Sprintf("%s-%s", data[r.columns[cPerso1]], data[r.columns[cPerso2]]) diff --git a/main.go b/main.go index e50ea53..5f1662e 100644 --- a/main.go +++ b/main.go @@ -1,120 +1,391 @@ package main import ( + "fmt" "io" "log" + "math/rand" + "net" + "net/http" "os" "path/filepath" + "sync" "time" "launchpad.net/go-xdg" - "github.com/gorilla/mux" "github.com/jessevdk/go-flags" "github.com/tylerb/graceful" "github.com/blevesearch/bleve" - bleveHttp "github.com/blevesearch/bleve/http" ) type Options struct { - CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"` - Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"` + CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"` + Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"` + MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"` + RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"` + CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" ` } -func readAlbums(csvPath string, albums chan *Album, errors chan error) { - defer close(albums) - csvFile, err := os.Open(csvPath) +type appData struct { + index bleve.Index + db *AlbumDatabase + getter *AlbumDescriptionGetter + cover *AlbumCoverCache + opts Options + + errors chan error + dbLock chan bool + wg sync.WaitGroup +} + +// type albumOperationType int + +// const ( +// OpAdd albumOperationType = iota +// OpDelete +// ) + +// type albumOperation struct { +// Type albumOperationType +// A *Album +// } + +func newAppData(opts Options) (*appData, error) { + basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite") + err := os.MkdirAll(basepath, 0755) if err != nil { - errors <- err + return nil, err + } + + res := &appData{ + opts: opts, + errors: make(chan error, 10), + dbLock: make(chan bool, 1), + } + + blevePath := filepath.Join(basepath, "index") + res.index, err = bleve.Open(blevePath) + + if err == bleve.ErrorIndexPathDoesNotExist { + res.index, err = bleve.New(blevePath, buildAlbumMapping()) + if err != nil { + return nil, err + } + } + + if err != nil { + return nil, err + } + + res.db = OpenAlbumDatabase(filepath.Join(basepath, "db")) + res.getter = &AlbumDescriptionGetter{ + getter: NewRateLimitedGetter(10, 10), + } + + res.cover = NewAlbumCoverCache(filepath.Join(basepath, "covers"), opts.MaxRequests, opts.RequestWindow) + //makes them both use the same HTTPGetter + res.cover.getter = res.getter.getter + + return res, nil +} + +func (a *appData) readCsv(stopChan <-chan struct{}, + newAlbums chan<- *Album, + deletedAlbum chan<- AlbumID) { + defer close(newAlbums) + defer a.wg.Done() + // defered call : we remove album that are not listed anymore in the CSV file + safeToDelete := false + shouldDelete := map[AlbumID]bool{} + defer func() { + if safeToDelete == true { + for deletedID, _ := range shouldDelete { + deletedAlbum <- deletedID + } + } + close(deletedAlbum) + }() + + allAlbums, err := a.db.ByPurchaseDate() + if err != nil { + a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) return } - defer closeOrPanic(csvFile, csvPath) - r, err := NewAlbumCsvReader(csvFile) + for _, aID := range allAlbums { + shouldDelete[aID] = true + } + + f, err := os.Open(a.opts.CsvFile) if err != nil { - errors <- err + a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) + return + } + defer closeOrPanic(f, a.opts.CsvFile) + + csvReader, err := NewAlbumCsvReader(f) + if err != nil { + a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) return } for { - a, err := r.Read() + album, err := csvReader.Read() if err != nil { - if err == io.EOF { - return + if err != io.EOF { + a.errors <- fmt.Errorf("[CSV PARSER]: %s", err) + continue } - errors <- err + //since we have parsed the whole database, we are safe to delete extra saved albums! + safeToDelete = true + return + } + select { + case <-stopChan: + return + case newAlbums <- album: + // then we should not delete it + delete(shouldDelete, album.ID) + } + } +} + +func (a *appData) indexAlbum(stopChan <-chan struct{}, + albumToIndexBatch <-chan *Album, + albumToReIndex <-chan *Album, + albumToDelete <-chan AlbumID, + deletedAlbum chan<- AlbumID) { + defer close(deletedAlbum) + defer a.wg.Done() + batch := a.index.NewBatch() + count := 0 + for { + select { + case <-stopChan: + return + case album, ok := <-albumToReIndex: + if ok == false { + albumToReIndex = nil + break + } + err := a.index.Index(AlbumIDString(album.ID), a) + if err != nil { + a.errors <- fmt.Errorf("[INDEX] re-indexing %d failed: %s", album.ID, err) + } + case album, ok := <-albumToIndexBatch: + if ok == false { + albumToIndexBatch = nil + if count%1000 != 0 { + err := a.index.Batch(batch) + if err != nil { + a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) + } else { + log.Printf("[INDEX] indexed %d albums", count) + } + } + err := batch.Index(AlbumIDString(album.ID), a) + if err != nil { + a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %d: %s", album.ID, err) + break + } + count++ + if count%1000 == 0 { + err := a.index.Batch(batch) + if err != nil { + a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err) + } else { + log.Printf("[INDEX] indexed %d albums", count) + } + batch = a.index.NewBatch() + } + break + } + case aID, ok := <-albumToDelete: + if ok == false { + albumToDelete = nil + } + err := a.index.Delete(AlbumIDString(aID)) + if err != nil { + a.errors <- fmt.Errorf("[INDEX]: delete failed: %s", err) + } + a.dbLock <- true + } + } +} + +func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- *Album) { + for album := range getAlbum { + albumCache, err := a.db.Get(album.ID) + if err == nil { + toIndex <- albumCache continue } - albums <- a - } -} + a.dbLock <- true -func indexAlbums(i bleve.Index, albums chan *Album, errors chan error) { - iAlbum := 0 - start := time.Now() - for a := range albums { - err := i.Index(AlbumIDString(a.ID), a) + err = a.getter.Get(album) if err != nil { - errors <- err + a.errors <- fmt.Errorf("[CACHE ALBUMS]: getting %d :%s", album.ID, err) + <-a.dbLock + return } - iAlbum++ - if iAlbum%100 == 0 { - dur := time.Since(start) - log.Printf("Indexed %d beer in %s (%f ms / Album)", iAlbum, dur, dur.Seconds()/float64(iAlbum)*1000) + err = a.db.AddOrUpdate(album) + <-a.dbLock + if err != nil { + a.errors <- fmt.Errorf("[CACHE ALBUMS]: storing %d: %s", album.ID, err) + return + } + toIndex <- album + + } +} + +func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{}, + checkAlbum <-chan AlbumID, + deleteAlbum <-chan AlbumID, + updateIndex chan<- *Album) { + defer close(updateIndex) + defer a.wg.Done() + for { + select { + case <-stopChan: + return + case aID, ok := <-deleteAlbum: + if ok == false { + deleteAlbum = nil + } + a.dbLock <- true + err := a.db.Delete(aID) + <-a.dbLock + if err != nil { + a.errors <- fmt.Errorf("[CACHE ALBUMS]: delete %d: %s", aID, err) + } + case aID, ok := <-checkAlbum: + if ok == false { + checkAlbum = nil + } + + a.dbLock <- true + cachedAlbum, err := a.db.Get(aID) + <-a.dbLock + + if err != nil { + a.errors <- fmt.Errorf("[CACHE ALBUMS]: check %d: %s", aID, err) + return + } + + if time.Now().Add(-a.opts.CacheTTL).Before(cachedAlbum.FetchDate) == true { + // cached value is good + return + } + + // thsi could be very long + err = a.getter.Get(cachedAlbum) + if err != nil { + a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err) + return + } + + // re-lock the db + a.dbLock <- true + err = a.db.AddOrUpdate(cachedAlbum) + <-a.dbLock + if err != nil { + a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err) + return + } + updateIndex <- cachedAlbum } } } -func buildOrOpen(basepath string) (bleve.Index, error) { - i, err := bleve.Open(basepath) - if err == bleve.ErrorIndexPathDoesNotExist { - return bleve.New(basepath, buildAlbumMapping()) +func (a *appData) updateCache(stopChan <-chan struct{}, periode time.Duration, checkAlbum chan<- AlbumID) { + defer close(checkAlbum) + defer a.wg.Done() + + ticker := time.NewTicker(periode) + + for { + select { + case <-stopChan: + return + case <-ticker.C: + //we just chek + a.dbLock <- true + albums, err := a.db.ByPurchaseDate() + <-a.dbLock + if err != nil { + a.errors <- fmt.Errorf("[UPDATER]: could not get albums: %s", err) + continue + } + for i := 0; i < int(periode.Minutes()); i++ { + checkAlbum <- albums[rand.Intn(len(albums))] + } + } } - return i, err } -// Execute executes the job +func (a *appData) logErrors() { + for e := range a.errors { + log.Printf("%s", e) + } +} + +func (a *appData) start(stopChan <-chan struct{}) { + newAlbumsFromCsv := make(chan *Album) + deletedAlbumsFromCsv := make(chan AlbumID) + a.wg.Add(4) + go a.logErrors() + go a.readCsv(stopChan, newAlbumsFromCsv, deletedAlbumsFromCsv) + + albumToBatchIndex := make(chan *Album) + go a.cacheAlbumDescription(newAlbumsFromCsv, albumToBatchIndex) + + albumToReIndex := make(chan *Album) + albumToDelete := make(chan AlbumID) + go a.indexAlbum(stopChan, albumToBatchIndex, albumToReIndex, deletedAlbumsFromCsv, albumToDelete) + + albumToCheck := make(chan AlbumID) + go a.maintainAlbumDatabase(stopChan, albumToCheck, albumToDelete, albumToReIndex) + go a.updateCache(stopChan, 10*time.Minute, albumToCheck) +} + +func (a *appData) wait() { + a.wg.Wait() +} + +// execute executes the job func Execute() error { - var opts Options + opts := Options{ + CacheTTL: 31 * 24 * time.Hour, + MaxRequests: 10, + RequestWindow: 10 * time.Second, + } _, err := flags.Parse(&opts) if err != nil { return err } - - basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite") - log.Printf("basepath is %s", basepath) - err = os.MkdirAll(basepath, 0755) + a, err := newAppData(opts) if err != nil { return err } - i, err := buildOrOpen("satbd.bar.satellite") - if err != nil { - return err + srv := &graceful.Server{ + Timeout: 10 * time.Second, + Server: &http.Server{ + Addr: opts.Listen, + }, } - albums := make(chan *Album) - errors := make(chan error) - - go readAlbums(opts.CsvFile, albums, errors) - go indexAlbums(i, albums, errors) - go func() { - for err := range errors { - log.Printf("ERROR: %s", err) - } - }() - - router := mux.NewRouter() - - bleveHttp.RegisterIndexName("album", i) - searchHandler := bleveHttp.NewSearchHandler("album") - - router.Handle("/api/search", searchHandler).Methods("POST") - log.Printf("Listening on %s", opts.Listen) - graceful.Run(opts.Listen, 10*time.Second, router) - + a.start(srv.StopChan()) + srv.Server.Handler = a.buildRouter() + if err := srv.ListenAndServe(); err != nil { + if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") { + return err + } + } return nil } diff --git a/router.go b/router.go new file mode 100644 index 0000000..fd33d4e --- /dev/null +++ b/router.go @@ -0,0 +1,78 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "path" + "regexp" + "strconv" + + bleve_http "github.com/blevesearch/bleve/http" + "github.com/gorilla/mux" +) + +var rxExt = regexp.MustCompile(`[0-9]+`) + +func (a *appData) buildRouter() http.Handler { + router := mux.NewRouter() + + bleve_http.RegisterIndexName("album", a.index) + searchHandler := bleve_http.NewSearchHandler("album") + + router.Handle("/api/search", searchHandler).Methods("POST") + + router.Handle("/api/recents", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + a.dbLock <- true + albums, err := a.db.ByPurchaseDate() + <-a.dbLock + + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + enc := json.NewEncoder(w) + err = enc.Encode(albums) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + })).Methods("GET") + + router.Handle("/api/albums/{id:[0-9]+}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + var id uint64 + var err error + var idStr string + var ok bool + if len(vars) != 0 { + idStr, ok = vars["id"] + if ok == true { + id, err = strconv.ParseUint(idStr, 10, 64) + } + } + + if ok == false || err != nil { + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + return + } + + a.dbLock <- true + album, err := a.db.Get(AlbumID(id)) + <-a.dbLock + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + //sanitize extension of the path from bedetheque.com + ext := path.Ext(album.CoverURL) + rxExt.ReplaceAllString(ext, "") + album.CoverURL = fmt.Sprintf("/covers/%d%s", album.ID, ext) + + enc := json.NewEncoder(w) + if err := enc.Encode(a); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + })).Methods("GET") + + return router +} diff --git a/utils.go b/utils.go index 20852f2..23f5aad 100644 --- a/utils.go +++ b/utils.go @@ -16,7 +16,8 @@ func closeOrPanic(c io.Closer, description string) { // close and ignore any error func closeIgnore(c io.Closer) { - c.Close() + // we simply ignore any error + _ = c.Close() } // close or log the failure. need to provide the %s in format