From 4a5c68df1756e2ceaa6bf7d4c795f5e737b54287 Mon Sep 17 00:00:00 2001 From: hexxa Date: Sun, 12 Sep 2021 12:45:03 +0800 Subject: [PATCH] fix(worker): refactor and fix issues of worker --- configs/dev.yml | 4 + configs/docker.yml | 4 + configs/lan.yml | 4 + src/client/web/src/client/files_mock.ts | 4 + src/client/web/src/components/browser.tsx | 5 + src/client/web/src/components/core_state.ts | 4 +- src/depidx/deps.go | 11 +- src/handlers/fileshdr/async_handlers.go | 42 ++++++++ src/handlers/fileshdr/handlers.go | 43 +++++--- src/server/config.go | 20 +++- src/server/server.go | 21 ++-- src/worker/interface.go | 4 + src/worker/localworker/worker.go | 114 ++++++++++---------- 13 files changed, 194 insertions(+), 86 deletions(-) create mode 100644 src/handlers/fileshdr/async_handlers.go diff --git a/configs/dev.yml b/configs/dev.yml index 61c67bd..d3e66f2 100644 --- a/configs/dev.yml +++ b/configs/dev.yml @@ -29,3 +29,7 @@ users: cookieHttpOnly: true minUserNameLen: 2 minPwdLen: 4 +workers: + queueSize: 1024 + sleepCyc: 1 # in second + workerCount: 2 diff --git a/configs/docker.yml b/configs/docker.yml index 07d20a5..f608c7e 100644 --- a/configs/docker.yml +++ b/configs/docker.yml @@ -29,3 +29,7 @@ users: cookieHttpOnly: true minUserNameLen: 4 minPwdLen: 6 +workers: + queueSize: 1024 + sleepCyc: 1 # in second + workerCount: 2 \ No newline at end of file diff --git a/configs/lan.yml b/configs/lan.yml index 88eafb5..b366efc 100644 --- a/configs/lan.yml +++ b/configs/lan.yml @@ -29,3 +29,7 @@ users: cookieHttpOnly: true minUserNameLen: 2 minPwdLen: 4 +workers: + queueSize: 1024 + sleepCyc: 1 # in second + workerCount: 2 \ No newline at end of file diff --git a/src/client/web/src/client/files_mock.ts b/src/client/web/src/client/files_mock.ts index 8aea448..2a37269 100644 --- a/src/client/web/src/client/files_mock.ts +++ b/src/client/web/src/client/files_mock.ts @@ -66,12 +66,14 @@ export const resps = { size: 5, modTime: "0", isDir: false, + sha1: "mock_file_sha1", }, { name: "mock_dir", size: 0, modTime: "0", isDir: true, + sha1: "", }, ], }, @@ -87,12 +89,14 @@ export const resps = { size: 5, modTime: "0", isDir: false, + sha1: "mock_file_sha1", }, { name: "mock_dir", size: 0, modTime: "0", isDir: true, + sha1: "", }, ], }, diff --git a/src/client/web/src/components/browser.tsx b/src/client/web/src/components/browser.tsx index 9c7939d..3184b36 100644 --- a/src/client/web/src/components/browser.tsx +++ b/src/client/web/src/components/browser.tsx @@ -26,6 +26,7 @@ export interface Item { modTime: string; isDir: boolean; selected: boolean; + sha1: string; } export interface BrowserProps { @@ -162,8 +163,12 @@ export class Browser extends React.Component { this.props.browser.items, this.state.selectedItems ) + .then(() => { + return updater().self(); + }) .then(() => { this.update(updater().updateBrowser); + this.update(updater().updateLogin); this.setState({ selectedSrc: "", selectedItems: Map(), diff --git a/src/client/web/src/components/core_state.ts b/src/client/web/src/components/core_state.ts index 4cb3958..4a9b790 100644 --- a/src/client/web/src/components/core_state.ts +++ b/src/client/web/src/components/core_state.ts @@ -10,7 +10,7 @@ import { AdminProps } from "./pane_admin"; import { MsgPackage } from "../i18n/msger"; import { Item } from "./browser"; -import { UploadInfo, User } from "../client"; +import { UploadInfo, User, MetadataResp } from "../client"; import { initUploadMgr, IWorker } from "../worker/upload_mgr"; export interface MsgProps { @@ -53,7 +53,7 @@ export function initState(): ICoreState { browser: { isVertical: isVertical(), dirPath: List(["."]), - items: List([]), + items: List([]), sharings: List([]), isSharing: false, uploadings: List([]), diff --git a/src/depidx/deps.go b/src/depidx/deps.go index 573718a..ff483f0 100644 --- a/src/depidx/deps.go +++ b/src/depidx/deps.go @@ -11,6 +11,7 @@ import ( "github.com/ihexxa/quickshare/src/iolimiter" "github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/userstore" + "github.com/ihexxa/quickshare/src/worker" ) type IUploader interface { @@ -27,10 +28,10 @@ type Deps struct { kv kvstore.IKVStore users userstore.IUserStore fileInfos fileinfostore.IFileInfoStore - uploader IUploader id idgen.IIDGen logger *zap.SugaredLogger limiter iolimiter.ILimiter + workers worker.IWorkerPool } func NewDeps(cfg gocfg.ICfg) *Deps { @@ -100,3 +101,11 @@ func (deps *Deps) Limiter() iolimiter.ILimiter { func (deps *Deps) SetLimiter(limiter iolimiter.ILimiter) { deps.limiter = limiter } + +func (deps *Deps) Workers() worker.IWorkerPool { + return deps.workers +} + +func (deps *Deps) SetWorkers(workers worker.IWorkerPool) { + deps.workers = workers +} diff --git a/src/handlers/fileshdr/async_handlers.go b/src/handlers/fileshdr/async_handlers.go new file mode 100644 index 0000000..b28906c --- /dev/null +++ b/src/handlers/fileshdr/async_handlers.go @@ -0,0 +1,42 @@ +package fileshdr + +import ( + "crypto/sha1" + "encoding/json" + "fmt" + "io" + + "github.com/ihexxa/quickshare/src/worker" +) +const MsgTypeSha1 = "sha1" + +type Sha1Params struct { + FilePath string +} + +func (h *FileHandlers) genSha1(msg worker.IMsg) error { + taskInputs := &Sha1Params{} + err := json.Unmarshal([]byte(msg.Body()), taskInputs) + if err != nil { + return fmt.Errorf("fail to unmarshal sha1 msg: %w", err) + } + + f, err := h.deps.FS().GetFileReader(taskInputs.FilePath) + if err != nil { + return fmt.Errorf("fail to get reader: %s", err) + } + + hasher := sha1.New() + buf := make([]byte, 4096) + _, err = io.CopyBuffer(hasher, f, buf) + if err != nil { + return err + } + + sha1Sign := fmt.Sprintf("%x", hasher.Sum(nil)) + err = h.deps.FileInfos().SetSha1(taskInputs.FilePath, sha1Sign) + if err != nil { + return fmt.Errorf("fail to set sha1: %s", err) + } + return nil +} diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 6731de0..253e7df 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -2,6 +2,7 @@ package fileshdr import ( "encoding/base64" + "encoding/json" "errors" "fmt" "io" @@ -20,7 +21,6 @@ import ( "github.com/ihexxa/quickshare/src/depidx" q "github.com/ihexxa/quickshare/src/handlers" "github.com/ihexxa/quickshare/src/userstore" - "github.com/ihexxa/quickshare/src/worker" "github.com/ihexxa/quickshare/src/worker/localworker" ) @@ -41,16 +41,17 @@ type FileHandlers struct { cfg gocfg.ICfg deps *depidx.Deps uploadMgr *UploadMgr - workers worker.IWorkerPool } -func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps, workers worker.IWorkerPool) (*FileHandlers, error) { - return &FileHandlers{ +func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps) (*FileHandlers, error) { + handlers := &FileHandlers{ cfg: cfg, deps: deps, uploadMgr: NewUploadMgr(deps.KV()), - workers: workers, - }, nil + } + deps.Workers().AddHandler(MsgTypeSha1, handlers.genSha1) + + return handlers, nil } type AutoLocker struct { @@ -232,7 +233,7 @@ type MetadataResp struct { Size int64 `json:"size"` ModTime time.Time `json:"modTime"` IsDir bool `json:"isDir"` - Sha1 string `json"sha1"` + Sha1 string `json:"sha1"` } func (h *FileHandlers) Metadata(c *gin.Context) { @@ -416,11 +417,19 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) { return } - err = h.workers.TryPut( + msg, err := json.Marshal(Sha1Params{ + FilePath: fsFilePath, + }) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } + + err = h.deps.Workers().TryPut( localworker.NewMsg( h.deps.ID().Gen(), - map[string]string{localworker.MsgTypeKey: "sha1"}, - fsFilePath, + map[string]string{localworker.MsgTypeKey: MsgTypeSha1}, + string(msg), ), ) if err != nil { @@ -877,11 +886,19 @@ func (h *FileHandlers) GenerateHash(c *gin.Context) { return } - err := h.workers.TryPut( + msg, err := json.Marshal(Sha1Params{ + FilePath: req.FilePath, + }) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } + + err = h.deps.Workers().TryPut( localworker.NewMsg( h.deps.ID().Gen(), - map[string]string{localworker.MsgTypeKey: "sha1"}, - req.FilePath, + map[string]string{localworker.MsgTypeKey: MsgTypeSha1}, + string(msg), ), ) if err != nil { diff --git a/src/server/config.go b/src/server/config.go index 084d0b1..88395e0 100644 --- a/src/server/config.go +++ b/src/server/config.go @@ -41,11 +41,18 @@ type ServerCfg struct { PublicPath string `json:"publicPath" yaml:"publicPath"` } +type WorkerPoolCfg struct { + QueueSize int `json:"queueSize" yaml:"queueSize"` + SleepCyc int `json:"sleepCyc" yaml:"sleepCyc"` + WorkerCount int `json:"workerCount" yaml:"workerCount"` +} + type Config struct { - Fs *FSConfig `json:"fs" yaml:"fs"` - Secrets *Secrets `json:"secrets" yaml:"secrets"` - Server *ServerCfg `json:"server" yaml:"server"` - Users *UsersCfg `json:"users" yaml:"users"` + Fs *FSConfig `json:"fs" yaml:"fs"` + Secrets *Secrets `json:"secrets" yaml:"secrets"` + Server *ServerCfg `json:"server" yaml:"server"` + Users *UsersCfg `json:"users" yaml:"users"` + Workers *WorkerPoolCfg `json:"workers" yaml:"workers"` } func NewConfig() *Config { @@ -89,6 +96,11 @@ func DefaultConfig() (string, error) { MaxHeaderBytes: 512, PublicPath: "public", }, + Workers: &WorkerPoolCfg{ + QueueSize: 1024, + SleepCyc: 1, + WorkerCount: 2, + }, } cfgBytes, err := json.Marshal(defaultCfg) diff --git a/src/server/server.go b/src/server/server.go index 77b6dfb..c8f0301 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -34,7 +34,6 @@ import ( "github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/kvstore/boltdbpvd" "github.com/ihexxa/quickshare/src/userstore" - "github.com/ihexxa/quickshare/src/worker" "github.com/ihexxa/quickshare/src/worker/localworker" ) @@ -42,7 +41,6 @@ type Server struct { server *http.Server cfg gocfg.ICfg deps *depidx.Deps - workers worker.IWorkerPool signalChan chan os.Signal } @@ -52,9 +50,8 @@ func NewServer(cfg gocfg.ICfg) (*Server, error) { } deps := initDeps(cfg) - workers := localworker.NewWorkerPool(1024, 5000, 2, deps) router := gin.Default() - router, err := initHandlers(router, cfg, deps, workers) + router, err := initHandlers(router, cfg, deps) if err != nil { return nil, err } @@ -71,7 +68,6 @@ func NewServer(cfg gocfg.ICfg) (*Server, error) { server: srv, deps: deps, cfg: cfg, - workers: workers, }, nil } @@ -132,10 +128,19 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps { deps.SetLog(logger) deps.SetLimiter(limiter) + queueSize := cfg.GrabInt("Workers.QueueSize") + sleepCyc := cfg.GrabInt("Workers.SleepCyc") + workerCount := cfg.GrabInt("Workers.WorkerCount") + fmt.Println(queueSize, sleepCyc, workerCount) + + workers := localworker.NewWorkerPool(queueSize, sleepCyc, workerCount, logger) + workers.Start() + deps.SetWorkers(workers) + return deps } -func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps, workers worker.IWorkerPool) (*gin.Engine, error) { +func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.Engine, error) { // handlers userHdrs, err := multiusers.NewMultiUsersSvc(cfg, deps) if err != nil { @@ -170,7 +175,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps, workers deps.Log().Infof("user (%s) is created\n", adminName) } - fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps, workers) + fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps) if err != nil { return nil, err } @@ -298,7 +303,7 @@ func (s *Server) Start() error { func (s *Server) Shutdown() error { // TODO: add timeout - s.workers.Stop() + s.deps.Workers().Stop() s.deps.Log().Sync() return s.server.Shutdown(context.Background()) } diff --git a/src/worker/interface.go b/src/worker/interface.go index b0ff930..5f0826b 100644 --- a/src/worker/interface.go +++ b/src/worker/interface.go @@ -14,8 +14,12 @@ type IMsg interface { Body() string } +type MsgHandler = func(msg IMsg) error + type IWorkerPool interface { TryPut(task IMsg) error Start() Stop() + AddHandler(msgType string, handler MsgHandler) + DelHandler(msgType string) } diff --git a/src/worker/localworker/worker.go b/src/worker/localworker/worker.go index 6e1a2fe..05f69f2 100644 --- a/src/worker/localworker/worker.go +++ b/src/worker/localworker/worker.go @@ -1,14 +1,12 @@ package localworker import ( - "crypto/sha1" - "encoding/json" "fmt" - "io" "sync" "time" - "github.com/ihexxa/quickshare/src/depidx" + "go.uber.org/zap" + "github.com/ihexxa/quickshare/src/worker" ) @@ -16,16 +14,6 @@ const ( MsgTypeKey = "msg-type" ) -type WorkerPool struct { - on bool - queue chan worker.IMsg - sleep int - workerCount int - started int - mtx *sync.RWMutex - deps *depidx.Deps -} - type Msg struct { id uint64 headers map[string]string @@ -52,19 +40,31 @@ func (m *Msg) Body() string { return m.body } -func NewWorkerPool(queueSize, sleep, workerCount int, deps *depidx.Deps) *WorkerPool { +type WorkerPool struct { + on bool + queue chan worker.IMsg + sleep int + workerCount int + started int + mtx *sync.RWMutex + logger *zap.SugaredLogger + msgHandlers map[string]worker.MsgHandler +} + +func NewWorkerPool(queueSize, sleep, workerCount int, logger *zap.SugaredLogger) *WorkerPool { return &WorkerPool{ on: true, - deps: deps, + logger: logger, mtx: &sync.RWMutex{}, sleep: sleep, workerCount: workerCount, queue: make(chan worker.IMsg, queueSize), + msgHandlers: map[string]worker.MsgHandler{}, } } func (wp *WorkerPool) TryPut(task worker.IMsg) error { - // this close the window that queue can be full after checking + // this closes the window that queue can be full after checking wp.mtx.Lock() defer wp.mtx.Unlock() @@ -75,11 +75,10 @@ func (wp *WorkerPool) TryPut(task worker.IMsg) error { return nil } -type Sha1Params struct { - FilePath string -} - func (wp *WorkerPool) Start() { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wp.on = true for wp.started < wp.workerCount { go wp.startWorker() @@ -88,10 +87,20 @@ func (wp *WorkerPool) Start() { } func (wp *WorkerPool) Stop() { - defer close(wp.queue) + wp.mtx.Lock() + defer wp.mtx.Unlock() + + // TODO: avoid sending and panic + close(wp.queue) wp.on = false for wp.started > 0 { - wp.deps.Log().Errorf(fmt.Sprintf("%d workers still in working", wp.started)) + wp.logger.Errorf( + fmt.Sprintf( + "%d workers (sleep %d second) still in working/sleeping", + wp.sleep, + wp.started, + ), + ) time.Sleep(time.Duration(1) * time.Second) } } @@ -104,30 +113,31 @@ func (wp *WorkerPool) startWorker() { func() { defer func() { if p := recover(); p != nil { - wp.deps.Log().Errorf("worker panic: %s", p) + wp.logger.Errorf("worker panic: %s", p) } }() - msg := <-wp.queue + + msg, ok := <-wp.queue + if !ok { + return + } + headers := msg.Headers() msgType, ok := headers[MsgTypeKey] if !ok { - wp.deps.Log().Errorf("msg type not found: %v", headers) + wp.logger.Errorf("msg type not found: %v", headers) + return } - switch msgType { - case "sha1": - sha1Params := &Sha1Params{} - err = json.Unmarshal([]byte(msg.Body()), sha1Params) - if err != nil { - wp.deps.Log().Errorf("fail to unmarshal sha1 msg: %s", err) - } - err = wp.sha1Task(sha1Params.FilePath) - if err != nil { - wp.deps.Log().Errorf("fail to do sha1: %s", err) - } - default: - wp.deps.Log().Errorf("unknown message tyope: %s", msgType) + handler, ok := wp.msgHandlers[msgType] + if !ok { + wp.logger.Errorf("no handler for the message type: %s", msgType) + return + } + + if err = handler(msg); err != nil { + wp.logger.Errorf("async task(%s) failed: %s", msgType, err) } }() @@ -137,23 +147,11 @@ func (wp *WorkerPool) startWorker() { wp.started-- } -func (wp *WorkerPool) sha1Task(filePath string) error { - f, err := wp.deps.FS().GetFileReader(filePath) - if err != nil { - return fmt.Errorf("fail to get reader: %s", err) - } - - h := sha1.New() - buf := make([]byte, 4096) - _, err = io.CopyBuffer(h, f, buf) - if err != nil { - return err - } - - sha1Sign := fmt.Sprintf("% x", h.Sum(nil)) - err = wp.deps.FileInfos().SetSha1(filePath, sha1Sign) - if err != nil { - return fmt.Errorf("fail to set sha1: %s", err) - } - return nil +func (wp *WorkerPool) AddHandler(msgType string, handler worker.MsgHandler) { + // existing task type will be overwritten + wp.msgHandlers[msgType] = handler +} + +func (wp *WorkerPool) DelHandler(msgType string) { + delete(wp.msgHandlers, msgType) }