Implements a new main

This commit is contained in:
2016-01-21 18:54:46 +01:00
parent 2516bcb128
commit b044fa17ab

166
main.go
View File

@@ -1,103 +1,111 @@
package main
import (
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"
"launchpad.net/go-xdg/v0"
"launchpad.net/go-xdg"
"github.com/gorilla/mux"
"github.com/jessevdk/go-flags"
"github.com/tylerb/graceful"
bleveHttp "github.com/blevesearch/bleve/http"
)
type Options struct {
CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"`
Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"`
}
func readAlbums(csvPath string, albums chan *Album, errors chan error) {
defer close(albums)
csvFile, err := os.Open(csvPath)
if err != nil {
errors <- err
return
}
defer closeOrPanic(csvFile, csvPath)
r, err := NewAlbumCsvReader(csvFile)
if err != nil {
errors <- err
return
}
for {
a, err := r.Read()
if err != nil {
if err == io.EOF {
return
}
errors <- err
continue
}
albums <- a
}
}
func indexAlbums(i Indexer, albums chan *Album, errors chan error) {
iAlbum := 0
start := time.Now()
for a := range albums {
err := i.Index(a)
if err != nil {
errors <- err
}
iAlbum++
if iAlbum%100 == 0 {
dur := time.Since(start)
log.Printf("Indexed %d beer in %s (%f ms / Album)", iAlbum, dur, dur.Seconds()/float64(iAlbum)*1000)
}
}
}
// Execute executes the job
func Execute() error {
if len(os.Args) != 2 {
return fmt.Errorf("Missing mandatory .csv parameter")
}
albums := make(chan *Album, 10000)
errors := make(chan error, 10)
parsed := make(chan bool, 10000)
cached := make(chan bool, 1000)
cacheBase, err := xdg.Cache.Ensure(filepath.Join("org.satellite.satbd", "test"))
var opts Options
_, err := flags.Parse(&opts)
if err != nil {
return err
}
cacheBase = filepath.Dir(cacheBase)
c, err := NewAlbumCoverCache(cacheBase, 10, 10*time.Second)
basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite")
log.Printf("basepath is %s", basepath)
err = os.MkdirAll(basepath, 0755)
if err != nil {
return err
}
g := AlbumDescriptionGetter{getter: c.getter}
start := time.Now()
go func() {
defer close(albums)
csvFile, err := os.Open(os.Args[1])
if err != nil {
errors <- err
return
}
defer closeOrLog(csvFile, "Could not close '"+os.Args[1]+"': %s")
csvReader, err := NewAlbumCsvReader(csvFile)
if err != nil {
errors <- err
return
}
for {
a, err := csvReader.Read()
if err != nil {
if err == io.EOF {
break
}
errors <- err
}
parsed <- true
albums <- a
}
fmt.Printf("Done parsing albums in %s\n\n", time.Since(start))
}()
go func() {
defer close(errors)
for {
a, ok := <-albums
if ok == false {
break
}
err := g.Get(a)
cached <- true
if err != nil {
errors <- err
}
}
fmt.Printf("DONE caching in %s\n\n", time.Since(start))
}()
pCount := 0
cCount := 0
errCount := 0
done := false
for done == false {
select {
case err, ok := <-errors:
if ok == false {
done = true
} else {
errCount++
}
if err != nil {
fmt.Printf("\033[1A\033[KGot error: %s\n\n", err)
}
case <-parsed:
pCount++
case <-cached:
cCount++
}
fmt.Printf("\033[1A\033[K%d/%d (errors:%d)\n", cCount, pCount, errCount)
i, err := NewBleveIndexer("satbd.bar.satellite")
if err != nil {
return err
}
albums := make(chan *Album)
errors := make(chan error)
go readAlbums(opts.CsvFile, albums, errors)
go indexAlbums(i, albums, errors)
go func() {
for err := range errors {
log.Printf("ERROR: %s", err)
}
}()
router := mux.NewRouter()
bleveHttp.RegisterIndexName("album", i.(*bleveIndexer).bl)
searchHandler := bleveHttp.NewSearchHandler("album")
router.Handle("/api/search", searchHandler).Methods("POST")
log.Printf("Listening on %s", opts.Listen)
graceful.Run(opts.Listen, 10*time.Second, router)
return nil
}