diff --git a/core/cache_warmer.go b/core/cache_warmer.go index 0d455bc2c..a3c3454cb 100644 --- a/core/cache_warmer.go +++ b/core/cache_warmer.go @@ -22,7 +22,7 @@ func NewCacheWarmer(artwork Artwork, artworkCache ArtworkCache) CacheWarmer { artworkCache: artworkCache, albums: map[string]struct{}{}, } - p, err := pool.NewPool("artwork", 3, &artworkItem{}, w.execute) + p, err := pool.NewPool("artwork", 3, w.execute) if err != nil { log.Error(context.Background(), "Error creating pool for Album Artwork Cache Warmer", err) } else { diff --git a/core/pool/pool.go b/core/pool/pool.go index 06abdec5c..56d729e67 100644 --- a/core/pool/pool.go +++ b/core/pool/pool.go @@ -9,114 +9,83 @@ import ( type Executor func(workload interface{}) type Pool struct { - name string - item interface{} - workers []worker - exec Executor - logTicker *time.Ticker - workerChannel chan chan work - queue chan work // receives jobs to send to workers - end chan bool // when receives bool stops workers - //queue *dque.DQue + name string + workers []worker + exec Executor + queue chan work // receives jobs to send to workers + done chan bool // when receives bool stops workers + working bool } // TODO This hardcoded value will go away when the queue is persisted in disk const bufferSize = 10000 -func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Pool, error) { +func NewPool(name string, workerCount int, exec Executor) (*Pool, error) { p := &Pool{ - name: name, - item: item, - exec: exec, - queue: make(chan work, bufferSize), - end: make(chan bool), + name: name, + exec: exec, + queue: make(chan work, bufferSize), + done: make(chan bool), + working: false, } - //q, err := dque.NewOrOpen(name, filepath.Join(conf.Server.DataFolder, "queues", name), 50, p.itemBuilder) - //if err != nil { - // return nil, err - //} - //p.queue = q - - p.workerChannel = make(chan chan work) for i := 0; i < workerCount; i++ { worker := worker{ - p: p, - id: i, - channel: make(chan work), - workerChannel: p.workerChannel, - end: make(chan bool)} + p: p, + id: i, + } worker.Start() p.workers = append(p.workers, worker) } - // start pool go func() { - p.logTicker = time.NewTicker(10 * time.Second) - running := false + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() for { select { - case <-p.logTicker.C: + case <-ticker.C: if len(p.queue) > 0 { - log.Debug("Queue status", "pool", p.name, "items", len(p.queue)) + log.Debug("Queue status", "poolName", p.name, "items", len(p.queue)) } else { - if running { - log.Info("Queue empty", "pool", p.name) + if p.working { + log.Info("Queue is empty, all items processed", "poolName", p.name) } - running = false - } - case <-p.end: - for _, w := range p.workers { - w.Stop() // stop worker + p.working = false } + case <-p.done: + close(p.queue) return - case work := <-p.queue: - running = true - worker := <-p.workerChannel // wait for available channel - worker <- work // dispatch work to worker } } }() + return p, nil } func (p *Pool) Submit(workload interface{}) { + p.working = true p.queue <- work{workload} } -//func (p *Pool) itemBuilder() interface{} { -// t := reflect.TypeOf(p.item) -// return reflect.New(t).Interface() -//} -// type work struct { workload interface{} } type worker struct { - id int - p *Pool - workerChannel chan chan work // used to communicate between dispatcher and workers - channel chan work - end chan bool + id int + p *Pool } // start worker func (w *worker) Start() { go func() { - for { - w.workerChannel <- w.channel // when the worker is available place channel in queue - select { - case job := <-w.channel: // worker has received job - w.p.exec(job.workload) // do work - case <-w.end: - return - } + for job := range w.p.queue { + w.p.exec(job.workload) // do work } }() } // end worker -func (w *worker) Stop() { - w.end <- true +func (p *Pool) Stop() { + p.done <- true } diff --git a/core/pool/pool_test.go b/core/pool/pool_test.go index 9a14357c9..aa7c2a9a7 100644 --- a/core/pool/pool_test.go +++ b/core/pool/pool_test.go @@ -27,7 +27,7 @@ var _ = Describe("Pool", func() { BeforeEach(func() { processed = nil - pool, _ = NewPool("test", 2, &testItem{}, execute) + pool, _ = NewPool("test", 2, execute) }) It("processes items", func() { diff --git a/go.sum b/go.sum index 5fcd0acb4..5a70f611e 100644 --- a/go.sum +++ b/go.sum @@ -413,6 +413,7 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.3 h1:gph6h/qe9GSUw1NhH1gp+qb+h8rXD8Cy60Z32Qw3ELA= github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc= +github.com/onsi/gomega v1.10.4 h1:NiTx7EEvBzu9sFOD1zORteLSt3o8gnlvZZwSE9TnY9U= github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= @@ -652,6 +653,7 @@ golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARV golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=