Implements a basic main function

This commit is contained in:
2016-01-22 18:06:33 +01:00
parent fe6e32ec7b
commit f407c1cf3e
6 changed files with 431 additions and 67 deletions

395
main.go
View File

@@ -1,120 +1,391 @@
package main
import (
"fmt"
"io"
"log"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"launchpad.net/go-xdg"
"github.com/gorilla/mux"
"github.com/jessevdk/go-flags"
"github.com/tylerb/graceful"
"github.com/blevesearch/bleve"
bleveHttp "github.com/blevesearch/bleve/http"
)
type Options struct {
CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"`
Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"`
CsvFile string `long:"csv" description:"CSV file to build the index from" required:"true"`
Listen string `short:"l" long:"listen" description:"Address to listen to" default:":33276"`
MaxRequests uint `long:"max-request" description:"Max request done externally per --request-window" default:"10"`
RequestWindow time.Duration `long:"request-window" description:"Window over which no more --max-request are done"`
CacheTTL time.Duration `long:"chache-ttl" description:"TTL of the cached data" `
}
func readAlbums(csvPath string, albums chan *Album, errors chan error) {
defer close(albums)
csvFile, err := os.Open(csvPath)
type appData struct {
index bleve.Index
db *AlbumDatabase
getter *AlbumDescriptionGetter
cover *AlbumCoverCache
opts Options
errors chan error
dbLock chan bool
wg sync.WaitGroup
}
// type albumOperationType int
// const (
// OpAdd albumOperationType = iota
// OpDelete
// )
// type albumOperation struct {
// Type albumOperationType
// A *Album
// }
func newAppData(opts Options) (*appData, error) {
basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite")
err := os.MkdirAll(basepath, 0755)
if err != nil {
errors <- err
return nil, err
}
res := &appData{
opts: opts,
errors: make(chan error, 10),
dbLock: make(chan bool, 1),
}
blevePath := filepath.Join(basepath, "index")
res.index, err = bleve.Open(blevePath)
if err == bleve.ErrorIndexPathDoesNotExist {
res.index, err = bleve.New(blevePath, buildAlbumMapping())
if err != nil {
return nil, err
}
}
if err != nil {
return nil, err
}
res.db = OpenAlbumDatabase(filepath.Join(basepath, "db"))
res.getter = &AlbumDescriptionGetter{
getter: NewRateLimitedGetter(10, 10),
}
res.cover = NewAlbumCoverCache(filepath.Join(basepath, "covers"), opts.MaxRequests, opts.RequestWindow)
//makes them both use the same HTTPGetter
res.cover.getter = res.getter.getter
return res, nil
}
func (a *appData) readCsv(stopChan <-chan struct{},
newAlbums chan<- *Album,
deletedAlbum chan<- AlbumID) {
defer close(newAlbums)
defer a.wg.Done()
// defered call : we remove album that are not listed anymore in the CSV file
safeToDelete := false
shouldDelete := map[AlbumID]bool{}
defer func() {
if safeToDelete == true {
for deletedID, _ := range shouldDelete {
deletedAlbum <- deletedID
}
}
close(deletedAlbum)
}()
allAlbums, err := a.db.ByPurchaseDate()
if err != nil {
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
return
}
defer closeOrPanic(csvFile, csvPath)
r, err := NewAlbumCsvReader(csvFile)
for _, aID := range allAlbums {
shouldDelete[aID] = true
}
f, err := os.Open(a.opts.CsvFile)
if err != nil {
errors <- err
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
return
}
defer closeOrPanic(f, a.opts.CsvFile)
csvReader, err := NewAlbumCsvReader(f)
if err != nil {
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
return
}
for {
a, err := r.Read()
album, err := csvReader.Read()
if err != nil {
if err == io.EOF {
return
if err != io.EOF {
a.errors <- fmt.Errorf("[CSV PARSER]: %s", err)
continue
}
errors <- err
//since we have parsed the whole database, we are safe to delete extra saved albums!
safeToDelete = true
return
}
select {
case <-stopChan:
return
case newAlbums <- album:
// then we should not delete it
delete(shouldDelete, album.ID)
}
}
}
func (a *appData) indexAlbum(stopChan <-chan struct{},
albumToIndexBatch <-chan *Album,
albumToReIndex <-chan *Album,
albumToDelete <-chan AlbumID,
deletedAlbum chan<- AlbumID) {
defer close(deletedAlbum)
defer a.wg.Done()
batch := a.index.NewBatch()
count := 0
for {
select {
case <-stopChan:
return
case album, ok := <-albumToReIndex:
if ok == false {
albumToReIndex = nil
break
}
err := a.index.Index(AlbumIDString(album.ID), a)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] re-indexing %d failed: %s", album.ID, err)
}
case album, ok := <-albumToIndexBatch:
if ok == false {
albumToIndexBatch = nil
if count%1000 != 0 {
err := a.index.Batch(batch)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
} else {
log.Printf("[INDEX] indexed %d albums", count)
}
}
err := batch.Index(AlbumIDString(album.ID), a)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] could not batch indexion of %d: %s", album.ID, err)
break
}
count++
if count%1000 == 0 {
err := a.index.Batch(batch)
if err != nil {
a.errors <- fmt.Errorf("[INDEX] batch indexing failed: %s", err)
} else {
log.Printf("[INDEX] indexed %d albums", count)
}
batch = a.index.NewBatch()
}
break
}
case aID, ok := <-albumToDelete:
if ok == false {
albumToDelete = nil
}
err := a.index.Delete(AlbumIDString(aID))
if err != nil {
a.errors <- fmt.Errorf("[INDEX]: delete failed: %s", err)
}
a.dbLock <- true
}
}
}
func (a *appData) cacheAlbumDescription(getAlbum <-chan *Album, toIndex chan<- *Album) {
for album := range getAlbum {
albumCache, err := a.db.Get(album.ID)
if err == nil {
toIndex <- albumCache
continue
}
albums <- a
}
}
a.dbLock <- true
func indexAlbums(i bleve.Index, albums chan *Album, errors chan error) {
iAlbum := 0
start := time.Now()
for a := range albums {
err := i.Index(AlbumIDString(a.ID), a)
err = a.getter.Get(album)
if err != nil {
errors <- err
a.errors <- fmt.Errorf("[CACHE ALBUMS]: getting %d :%s", album.ID, err)
<-a.dbLock
return
}
iAlbum++
if iAlbum%100 == 0 {
dur := time.Since(start)
log.Printf("Indexed %d beer in %s (%f ms / Album)", iAlbum, dur, dur.Seconds()/float64(iAlbum)*1000)
err = a.db.AddOrUpdate(album)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: storing %d: %s", album.ID, err)
return
}
toIndex <- album
}
}
func (a *appData) maintainAlbumDatabase(stopChan <-chan struct{},
checkAlbum <-chan AlbumID,
deleteAlbum <-chan AlbumID,
updateIndex chan<- *Album) {
defer close(updateIndex)
defer a.wg.Done()
for {
select {
case <-stopChan:
return
case aID, ok := <-deleteAlbum:
if ok == false {
deleteAlbum = nil
}
a.dbLock <- true
err := a.db.Delete(aID)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: delete %d: %s", aID, err)
}
case aID, ok := <-checkAlbum:
if ok == false {
checkAlbum = nil
}
a.dbLock <- true
cachedAlbum, err := a.db.Get(aID)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: check %d: %s", aID, err)
return
}
if time.Now().Add(-a.opts.CacheTTL).Before(cachedAlbum.FetchDate) == true {
// cached value is good
return
}
// thsi could be very long
err = a.getter.Get(cachedAlbum)
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err)
return
}
// re-lock the db
a.dbLock <- true
err = a.db.AddOrUpdate(cachedAlbum)
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[CACHE ALBUMS]: GET %d: %s", aID, err)
return
}
updateIndex <- cachedAlbum
}
}
}
func buildOrOpen(basepath string) (bleve.Index, error) {
i, err := bleve.Open(basepath)
if err == bleve.ErrorIndexPathDoesNotExist {
return bleve.New(basepath, buildAlbumMapping())
func (a *appData) updateCache(stopChan <-chan struct{}, periode time.Duration, checkAlbum chan<- AlbumID) {
defer close(checkAlbum)
defer a.wg.Done()
ticker := time.NewTicker(periode)
for {
select {
case <-stopChan:
return
case <-ticker.C:
//we just chek
a.dbLock <- true
albums, err := a.db.ByPurchaseDate()
<-a.dbLock
if err != nil {
a.errors <- fmt.Errorf("[UPDATER]: could not get albums: %s", err)
continue
}
for i := 0; i < int(periode.Minutes()); i++ {
checkAlbum <- albums[rand.Intn(len(albums))]
}
}
}
return i, err
}
// Execute executes the job
func (a *appData) logErrors() {
for e := range a.errors {
log.Printf("%s", e)
}
}
func (a *appData) start(stopChan <-chan struct{}) {
newAlbumsFromCsv := make(chan *Album)
deletedAlbumsFromCsv := make(chan AlbumID)
a.wg.Add(4)
go a.logErrors()
go a.readCsv(stopChan, newAlbumsFromCsv, deletedAlbumsFromCsv)
albumToBatchIndex := make(chan *Album)
go a.cacheAlbumDescription(newAlbumsFromCsv, albumToBatchIndex)
albumToReIndex := make(chan *Album)
albumToDelete := make(chan AlbumID)
go a.indexAlbum(stopChan, albumToBatchIndex, albumToReIndex, deletedAlbumsFromCsv, albumToDelete)
albumToCheck := make(chan AlbumID)
go a.maintainAlbumDatabase(stopChan, albumToCheck, albumToDelete, albumToReIndex)
go a.updateCache(stopChan, 10*time.Minute, albumToCheck)
}
func (a *appData) wait() {
a.wg.Wait()
}
// execute executes the job
func Execute() error {
var opts Options
opts := Options{
CacheTTL: 31 * 24 * time.Hour,
MaxRequests: 10,
RequestWindow: 10 * time.Second,
}
_, err := flags.Parse(&opts)
if err != nil {
return err
}
basepath := filepath.Join(xdg.Cache.Home(), "satbd.bar.satellite")
log.Printf("basepath is %s", basepath)
err = os.MkdirAll(basepath, 0755)
a, err := newAppData(opts)
if err != nil {
return err
}
i, err := buildOrOpen("satbd.bar.satellite")
if err != nil {
return err
srv := &graceful.Server{
Timeout: 10 * time.Second,
Server: &http.Server{
Addr: opts.Listen,
},
}
albums := make(chan *Album)
errors := make(chan error)
go readAlbums(opts.CsvFile, albums, errors)
go indexAlbums(i, albums, errors)
go func() {
for err := range errors {
log.Printf("ERROR: %s", err)
}
}()
router := mux.NewRouter()
bleveHttp.RegisterIndexName("album", i)
searchHandler := bleveHttp.NewSearchHandler("album")
router.Handle("/api/search", searchHandler).Methods("POST")
log.Printf("Listening on %s", opts.Listen)
graceful.Run(opts.Listen, 10*time.Second, router)
a.start(srv.StopChan())
srv.Server.Handler = a.buildRouter()
if err := srv.ListenAndServe(); err != nil {
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
return err
}
}
return nil
}