From b044fa17ab1030b0fff8e80cc1a83f958c0644f1 Mon Sep 17 00:00:00 2001 From: Alexandre Tuleu Date: Thu, 21 Jan 2016 18:54:46 +0100 Subject: [PATCH] Implements a new main --- main.go | 166 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 87 insertions(+), 79 deletions(-) diff --git a/main.go b/main.go index 989c35f..a9e902f 100644 --- a/main.go +++ b/main.go @@ -1,103 +1,111 @@ package main import ( - "fmt" "io" "log" "os" "path/filepath" "time" - "launchpad.net/go-xdg/v0" + "launchpad.net/go-xdg" + + "github.com/gorilla/mux" + "github.com/jessevdk/go-flags" + "github.com/tylerb/graceful" + + bleveHttp "github.com/blevesearch/bleve/http" ) +type Options struct { + CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"` + Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"` +} + +func readAlbums(csvPath string, albums chan *Album, errors chan error) { + defer close(albums) + csvFile, err := os.Open(csvPath) + if err != nil { + errors <- err + return + } + defer closeOrPanic(csvFile, csvPath) + + r, err := NewAlbumCsvReader(csvFile) + if err != nil { + errors <- err + return + } + + for { + a, err := r.Read() + if err != nil { + if err == io.EOF { + return + } + errors <- err + continue + } + albums <- a + } +} + +func indexAlbums(i Indexer, albums chan *Album, errors chan error) { + iAlbum := 0 + start := time.Now() + for a := range albums { + err := i.Index(a) + if err != nil { + errors <- err + } + iAlbum++ + if iAlbum%100 == 0 { + dur := time.Since(start) + log.Printf("Indexed %d beer in %s (%f ms / Album)", iAlbum, dur, dur.Seconds()/float64(iAlbum)*1000) + } + } +} + // Execute executes the job func Execute() error { - if len(os.Args) != 2 { - return fmt.Errorf("Missing mandatory .csv parameter") - } - - albums := make(chan *Album, 10000) - errors := make(chan error, 10) - parsed := make(chan bool, 10000) - cached := make(chan bool, 1000) - cacheBase, err := xdg.Cache.Ensure(filepath.Join("org.satellite.satbd", "test")) + var opts Options + _, err := flags.Parse(&opts) if err != nil { return err } - cacheBase = filepath.Dir(cacheBase) - c, err := NewAlbumCoverCache(cacheBase, 10, 10*time.Second) + + basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite") + log.Printf("basepath is %s", basepath) + err = os.MkdirAll(basepath, 0755) if err != nil { return err } - g := AlbumDescriptionGetter{getter: c.getter} - start := time.Now() - go func() { - defer close(albums) - csvFile, err := os.Open(os.Args[1]) - if err != nil { - errors <- err - return - } - defer closeOrLog(csvFile, "Could not close '"+os.Args[1]+"': %s") - csvReader, err := NewAlbumCsvReader(csvFile) - if err != nil { - errors <- err - return - } - for { - a, err := csvReader.Read() - if err != nil { - if err == io.EOF { - break - } - errors <- err - } - parsed <- true - albums <- a - } - fmt.Printf("Done parsing albums in %s\n\n", time.Since(start)) - }() - - go func() { - defer close(errors) - for { - a, ok := <-albums - if ok == false { - break - } - err := g.Get(a) - cached <- true - if err != nil { - errors <- err - } - } - fmt.Printf("DONE caching in %s\n\n", time.Since(start)) - }() - - pCount := 0 - cCount := 0 - errCount := 0 - done := false - for done == false { - select { - case err, ok := <-errors: - if ok == false { - done = true - } else { - errCount++ - } - if err != nil { - fmt.Printf("\033[1A\033[KGot error: %s\n\n", err) - } - case <-parsed: - pCount++ - case <-cached: - cCount++ - } - fmt.Printf("\033[1A\033[K%d/%d (errors:%d)\n", cCount, pCount, errCount) + i, err := NewBleveIndexer("satbd.bar.satellite") + if err != nil { + return err } + + albums := make(chan *Album) + errors := make(chan error) + + go readAlbums(opts.CsvFile, albums, errors) + go indexAlbums(i, albums, errors) + go func() { + for err := range errors { + log.Printf("ERROR: %s", err) + } + }() + + router := mux.NewRouter() + + bleveHttp.RegisterIndexName("album", i.(*bleveIndexer).bl) + searchHandler := bleveHttp.NewSearchHandler("album") + + router.Handle("/api/search", searchHandler).Methods("POST") + + log.Printf("Listening on %s", opts.Listen) + graceful.Run(opts.Listen, 10*time.Second, router) + return nil }