diff --git a/src/worker/localworker/worker.go b/src/worker/localworker/worker.go index 05f69f2..cdadade 100644 --- a/src/worker/localworker/worker.go +++ b/src/worker/localworker/worker.go @@ -91,12 +91,22 @@ func (wp *WorkerPool) Stop() { defer wp.mtx.Unlock() // TODO: avoid sending and panic + for len(wp.queue) > 0 { + wp.logger.Infof( + fmt.Sprintf( + "draining: %d messages left", + len(wp.queue), + ), + ) + time.Sleep(time.Duration(1) * time.Second) + } close(wp.queue) + wp.on = false for wp.started > 0 { - wp.logger.Errorf( + wp.logger.Infof( fmt.Sprintf( - "%d workers (sleep %d second) still in working/sleeping", + "stopping: %d workers (sleep %d second) still in working/sleeping", wp.sleep, wp.started, ), @@ -117,12 +127,11 @@ func (wp *WorkerPool) startWorker() { } }() - msg, ok := <-wp.queue if !ok { return } - + headers := msg.Headers() msgType, ok := headers[MsgTypeKey] if !ok { diff --git a/src/worker/localworker/worker_test.go b/src/worker/localworker/worker_test.go new file mode 100644 index 0000000..b43f98c --- /dev/null +++ b/src/worker/localworker/worker_test.go @@ -0,0 +1,106 @@ +package localworker_test + +import ( + "encoding/json" + "os" + "sync" + "testing" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/ihexxa/quickshare/src/worker" + "github.com/ihexxa/quickshare/src/worker/localworker" +) + +func TestWorkerPools(t *testing.T) { + type tinput struct { + ID int `json:"id"` + } + + workersTest := func(workers worker.IWorkerPool, t *testing.T) { + records := &sync.Map{} + mType1, mType2 := "mtype1", "mtype2" + + handler1 := func(msg worker.IMsg) error { + input := &tinput{} + err := json.Unmarshal([]byte(msg.Body()), input) + if err != nil { + t.Fatal(err) + } + + records.Store(mType1, input.ID) + return nil + } + handler2 := func(msg worker.IMsg) error { + input := &tinput{} + err := json.Unmarshal([]byte(msg.Body()), input) + if err != nil { + t.Fatal(err) + } + + records.Store(mType2, input.ID) + return nil + } + + workers.AddHandler(mType1, handler1) + workers.AddHandler(mType2, handler2) + workers.Start() + + count := 3 + for i := 0; i < count; i++ { + body, _ := json.Marshal(&tinput{ID: i}) + workers.TryPut(localworker.NewMsg( + uint64(i), + map[string]string{localworker.MsgTypeKey: mType1}, + string(body), + )) + workers.TryPut(localworker.NewMsg( + uint64(i*10), + map[string]string{localworker.MsgTypeKey: mType2}, + string(body), + )) + } + + workers.Stop() + workers.DelHandler(mType1) + workers.DelHandler(mType2) + + val1, ok := records.Load(mType1) + if !ok { + t.Fatal("mtype1 not found") + } + count1 := val1.(int) + if count1 != count-1 { + t.Fatalf("incorrect count %d", count1) + } + + val2, ok := records.Load(mType2) + if !ok { + t.Fatal("mtype2 not found") + } + count2 := val2.(int) + if count1 != count-1 { + t.Fatalf("incorrect count %d", count2) + } + } + + t.Run("test bolt provider", func(t *testing.T) { + // rootPath, err := ioutil.TempDir("./", "quickshare_kvstore_test_") + // if err != nil { + // t.Fatal(err) + // } + // defer os.RemoveAll(rootPath) + + stdoutWriter := zapcore.AddSync(os.Stdout) + multiWriter := zapcore.NewMultiWriteSyncer(stdoutWriter) + core := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + multiWriter, + zap.InfoLevel, + ) + + workers := localworker.NewWorkerPool(1024, 1, 2, zap.New(core).Sugar()) + workersTest(workers, t) + }) +}