fix(worker): refactor and fix issues of worker

This commit is contained in:
hexxa 2021-09-12 12:45:03 +08:00 committed by Hexxa
parent 8518072c7e
commit 4a5c68df17
13 changed files with 194 additions and 86 deletions

View file

@ -29,3 +29,7 @@ users:
cookieHttpOnly: true
minUserNameLen: 2
minPwdLen: 4
workers:
queueSize: 1024
sleepCyc: 1 # in second
workerCount: 2

View file

@ -29,3 +29,7 @@ users:
cookieHttpOnly: true
minUserNameLen: 4
minPwdLen: 6
workers:
queueSize: 1024
sleepCyc: 1 # in second
workerCount: 2

View file

@ -29,3 +29,7 @@ users:
cookieHttpOnly: true
minUserNameLen: 2
minPwdLen: 4
workers:
queueSize: 1024
sleepCyc: 1 # in second
workerCount: 2

View file

@ -66,12 +66,14 @@ export const resps = {
size: 5,
modTime: "0",
isDir: false,
sha1: "mock_file_sha1",
},
{
name: "mock_dir",
size: 0,
modTime: "0",
isDir: true,
sha1: "",
},
],
},
@ -87,12 +89,14 @@ export const resps = {
size: 5,
modTime: "0",
isDir: false,
sha1: "mock_file_sha1",
},
{
name: "mock_dir",
size: 0,
modTime: "0",
isDir: true,
sha1: "",
},
],
},

View file

@ -26,6 +26,7 @@ export interface Item {
modTime: string;
isDir: boolean;
selected: boolean;
sha1: string;
}
export interface BrowserProps {
@ -162,8 +163,12 @@ export class Browser extends React.Component<Props, State, {}> {
this.props.browser.items,
this.state.selectedItems
)
.then(() => {
return updater().self();
})
.then(() => {
this.update(updater().updateBrowser);
this.update(updater().updateLogin);
this.setState({
selectedSrc: "",
selectedItems: Map<string, boolean>(),

View file

@ -10,7 +10,7 @@ import { AdminProps } from "./pane_admin";
import { MsgPackage } from "../i18n/msger";
import { Item } from "./browser";
import { UploadInfo, User } from "../client";
import { UploadInfo, User, MetadataResp } from "../client";
import { initUploadMgr, IWorker } from "../worker/upload_mgr";
export interface MsgProps {
@ -53,7 +53,7 @@ export function initState(): ICoreState {
browser: {
isVertical: isVertical(),
dirPath: List<string>(["."]),
items: List<Item>([]),
items: List<MetadataResp>([]),
sharings: List<string>([]),
isSharing: false,
uploadings: List<UploadInfo>([]),

View file

@ -11,6 +11,7 @@ import (
"github.com/ihexxa/quickshare/src/iolimiter"
"github.com/ihexxa/quickshare/src/kvstore"
"github.com/ihexxa/quickshare/src/userstore"
"github.com/ihexxa/quickshare/src/worker"
)
type IUploader interface {
@ -27,10 +28,10 @@ type Deps struct {
kv kvstore.IKVStore
users userstore.IUserStore
fileInfos fileinfostore.IFileInfoStore
uploader IUploader
id idgen.IIDGen
logger *zap.SugaredLogger
limiter iolimiter.ILimiter
workers worker.IWorkerPool
}
func NewDeps(cfg gocfg.ICfg) *Deps {
@ -100,3 +101,11 @@ func (deps *Deps) Limiter() iolimiter.ILimiter {
func (deps *Deps) SetLimiter(limiter iolimiter.ILimiter) {
deps.limiter = limiter
}
func (deps *Deps) Workers() worker.IWorkerPool {
return deps.workers
}
func (deps *Deps) SetWorkers(workers worker.IWorkerPool) {
deps.workers = workers
}

View file

@ -0,0 +1,42 @@
package fileshdr
import (
"crypto/sha1"
"encoding/json"
"fmt"
"io"
"github.com/ihexxa/quickshare/src/worker"
)
const MsgTypeSha1 = "sha1"
type Sha1Params struct {
FilePath string
}
func (h *FileHandlers) genSha1(msg worker.IMsg) error {
taskInputs := &Sha1Params{}
err := json.Unmarshal([]byte(msg.Body()), taskInputs)
if err != nil {
return fmt.Errorf("fail to unmarshal sha1 msg: %w", err)
}
f, err := h.deps.FS().GetFileReader(taskInputs.FilePath)
if err != nil {
return fmt.Errorf("fail to get reader: %s", err)
}
hasher := sha1.New()
buf := make([]byte, 4096)
_, err = io.CopyBuffer(hasher, f, buf)
if err != nil {
return err
}
sha1Sign := fmt.Sprintf("%x", hasher.Sum(nil))
err = h.deps.FileInfos().SetSha1(taskInputs.FilePath, sha1Sign)
if err != nil {
return fmt.Errorf("fail to set sha1: %s", err)
}
return nil
}

View file

@ -2,6 +2,7 @@ package fileshdr
import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
@ -20,7 +21,6 @@ import (
"github.com/ihexxa/quickshare/src/depidx"
q "github.com/ihexxa/quickshare/src/handlers"
"github.com/ihexxa/quickshare/src/userstore"
"github.com/ihexxa/quickshare/src/worker"
"github.com/ihexxa/quickshare/src/worker/localworker"
)
@ -41,16 +41,17 @@ type FileHandlers struct {
cfg gocfg.ICfg
deps *depidx.Deps
uploadMgr *UploadMgr
workers worker.IWorkerPool
}
func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps, workers worker.IWorkerPool) (*FileHandlers, error) {
return &FileHandlers{
func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps) (*FileHandlers, error) {
handlers := &FileHandlers{
cfg: cfg,
deps: deps,
uploadMgr: NewUploadMgr(deps.KV()),
workers: workers,
}, nil
}
deps.Workers().AddHandler(MsgTypeSha1, handlers.genSha1)
return handlers, nil
}
type AutoLocker struct {
@ -232,7 +233,7 @@ type MetadataResp struct {
Size int64 `json:"size"`
ModTime time.Time `json:"modTime"`
IsDir bool `json:"isDir"`
Sha1 string `json"sha1"`
Sha1 string `json:"sha1"`
}
func (h *FileHandlers) Metadata(c *gin.Context) {
@ -416,11 +417,19 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
return
}
err = h.workers.TryPut(
msg, err := json.Marshal(Sha1Params{
FilePath: fsFilePath,
})
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
err = h.deps.Workers().TryPut(
localworker.NewMsg(
h.deps.ID().Gen(),
map[string]string{localworker.MsgTypeKey: "sha1"},
fsFilePath,
map[string]string{localworker.MsgTypeKey: MsgTypeSha1},
string(msg),
),
)
if err != nil {
@ -877,11 +886,19 @@ func (h *FileHandlers) GenerateHash(c *gin.Context) {
return
}
err := h.workers.TryPut(
msg, err := json.Marshal(Sha1Params{
FilePath: req.FilePath,
})
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
err = h.deps.Workers().TryPut(
localworker.NewMsg(
h.deps.ID().Gen(),
map[string]string{localworker.MsgTypeKey: "sha1"},
req.FilePath,
map[string]string{localworker.MsgTypeKey: MsgTypeSha1},
string(msg),
),
)
if err != nil {

View file

@ -41,11 +41,18 @@ type ServerCfg struct {
PublicPath string `json:"publicPath" yaml:"publicPath"`
}
type WorkerPoolCfg struct {
QueueSize int `json:"queueSize" yaml:"queueSize"`
SleepCyc int `json:"sleepCyc" yaml:"sleepCyc"`
WorkerCount int `json:"workerCount" yaml:"workerCount"`
}
type Config struct {
Fs *FSConfig `json:"fs" yaml:"fs"`
Secrets *Secrets `json:"secrets" yaml:"secrets"`
Server *ServerCfg `json:"server" yaml:"server"`
Users *UsersCfg `json:"users" yaml:"users"`
Fs *FSConfig `json:"fs" yaml:"fs"`
Secrets *Secrets `json:"secrets" yaml:"secrets"`
Server *ServerCfg `json:"server" yaml:"server"`
Users *UsersCfg `json:"users" yaml:"users"`
Workers *WorkerPoolCfg `json:"workers" yaml:"workers"`
}
func NewConfig() *Config {
@ -89,6 +96,11 @@ func DefaultConfig() (string, error) {
MaxHeaderBytes: 512,
PublicPath: "public",
},
Workers: &WorkerPoolCfg{
QueueSize: 1024,
SleepCyc: 1,
WorkerCount: 2,
},
}
cfgBytes, err := json.Marshal(defaultCfg)

View file

@ -34,7 +34,6 @@ import (
"github.com/ihexxa/quickshare/src/kvstore"
"github.com/ihexxa/quickshare/src/kvstore/boltdbpvd"
"github.com/ihexxa/quickshare/src/userstore"
"github.com/ihexxa/quickshare/src/worker"
"github.com/ihexxa/quickshare/src/worker/localworker"
)
@ -42,7 +41,6 @@ type Server struct {
server *http.Server
cfg gocfg.ICfg
deps *depidx.Deps
workers worker.IWorkerPool
signalChan chan os.Signal
}
@ -52,9 +50,8 @@ func NewServer(cfg gocfg.ICfg) (*Server, error) {
}
deps := initDeps(cfg)
workers := localworker.NewWorkerPool(1024, 5000, 2, deps)
router := gin.Default()
router, err := initHandlers(router, cfg, deps, workers)
router, err := initHandlers(router, cfg, deps)
if err != nil {
return nil, err
}
@ -71,7 +68,6 @@ func NewServer(cfg gocfg.ICfg) (*Server, error) {
server: srv,
deps: deps,
cfg: cfg,
workers: workers,
}, nil
}
@ -132,10 +128,19 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps {
deps.SetLog(logger)
deps.SetLimiter(limiter)
queueSize := cfg.GrabInt("Workers.QueueSize")
sleepCyc := cfg.GrabInt("Workers.SleepCyc")
workerCount := cfg.GrabInt("Workers.WorkerCount")
fmt.Println(queueSize, sleepCyc, workerCount)
workers := localworker.NewWorkerPool(queueSize, sleepCyc, workerCount, logger)
workers.Start()
deps.SetWorkers(workers)
return deps
}
func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps, workers worker.IWorkerPool) (*gin.Engine, error) {
func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.Engine, error) {
// handlers
userHdrs, err := multiusers.NewMultiUsersSvc(cfg, deps)
if err != nil {
@ -170,7 +175,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps, workers
deps.Log().Infof("user (%s) is created\n", adminName)
}
fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps, workers)
fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps)
if err != nil {
return nil, err
}
@ -298,7 +303,7 @@ func (s *Server) Start() error {
func (s *Server) Shutdown() error {
// TODO: add timeout
s.workers.Stop()
s.deps.Workers().Stop()
s.deps.Log().Sync()
return s.server.Shutdown(context.Background())
}

View file

@ -14,8 +14,12 @@ type IMsg interface {
Body() string
}
type MsgHandler = func(msg IMsg) error
type IWorkerPool interface {
TryPut(task IMsg) error
Start()
Stop()
AddHandler(msgType string, handler MsgHandler)
DelHandler(msgType string)
}

View file

@ -1,14 +1,12 @@
package localworker
import (
"crypto/sha1"
"encoding/json"
"fmt"
"io"
"sync"
"time"
"github.com/ihexxa/quickshare/src/depidx"
"go.uber.org/zap"
"github.com/ihexxa/quickshare/src/worker"
)
@ -16,16 +14,6 @@ const (
MsgTypeKey = "msg-type"
)
type WorkerPool struct {
on bool
queue chan worker.IMsg
sleep int
workerCount int
started int
mtx *sync.RWMutex
deps *depidx.Deps
}
type Msg struct {
id uint64
headers map[string]string
@ -52,19 +40,31 @@ func (m *Msg) Body() string {
return m.body
}
func NewWorkerPool(queueSize, sleep, workerCount int, deps *depidx.Deps) *WorkerPool {
type WorkerPool struct {
on bool
queue chan worker.IMsg
sleep int
workerCount int
started int
mtx *sync.RWMutex
logger *zap.SugaredLogger
msgHandlers map[string]worker.MsgHandler
}
func NewWorkerPool(queueSize, sleep, workerCount int, logger *zap.SugaredLogger) *WorkerPool {
return &WorkerPool{
on: true,
deps: deps,
logger: logger,
mtx: &sync.RWMutex{},
sleep: sleep,
workerCount: workerCount,
queue: make(chan worker.IMsg, queueSize),
msgHandlers: map[string]worker.MsgHandler{},
}
}
func (wp *WorkerPool) TryPut(task worker.IMsg) error {
// this close the window that queue can be full after checking
// this closes the window that queue can be full after checking
wp.mtx.Lock()
defer wp.mtx.Unlock()
@ -75,11 +75,10 @@ func (wp *WorkerPool) TryPut(task worker.IMsg) error {
return nil
}
type Sha1Params struct {
FilePath string
}
func (wp *WorkerPool) Start() {
wp.mtx.Lock()
defer wp.mtx.Unlock()
wp.on = true
for wp.started < wp.workerCount {
go wp.startWorker()
@ -88,10 +87,20 @@ func (wp *WorkerPool) Start() {
}
func (wp *WorkerPool) Stop() {
defer close(wp.queue)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// TODO: avoid sending and panic
close(wp.queue)
wp.on = false
for wp.started > 0 {
wp.deps.Log().Errorf(fmt.Sprintf("%d workers still in working", wp.started))
wp.logger.Errorf(
fmt.Sprintf(
"%d workers (sleep %d second) still in working/sleeping",
wp.sleep,
wp.started,
),
)
time.Sleep(time.Duration(1) * time.Second)
}
}
@ -104,30 +113,31 @@ func (wp *WorkerPool) startWorker() {
func() {
defer func() {
if p := recover(); p != nil {
wp.deps.Log().Errorf("worker panic: %s", p)
wp.logger.Errorf("worker panic: %s", p)
}
}()
msg := <-wp.queue
msg, ok := <-wp.queue
if !ok {
return
}
headers := msg.Headers()
msgType, ok := headers[MsgTypeKey]
if !ok {
wp.deps.Log().Errorf("msg type not found: %v", headers)
wp.logger.Errorf("msg type not found: %v", headers)
return
}
switch msgType {
case "sha1":
sha1Params := &Sha1Params{}
err = json.Unmarshal([]byte(msg.Body()), sha1Params)
if err != nil {
wp.deps.Log().Errorf("fail to unmarshal sha1 msg: %s", err)
}
err = wp.sha1Task(sha1Params.FilePath)
if err != nil {
wp.deps.Log().Errorf("fail to do sha1: %s", err)
}
default:
wp.deps.Log().Errorf("unknown message tyope: %s", msgType)
handler, ok := wp.msgHandlers[msgType]
if !ok {
wp.logger.Errorf("no handler for the message type: %s", msgType)
return
}
if err = handler(msg); err != nil {
wp.logger.Errorf("async task(%s) failed: %s", msgType, err)
}
}()
@ -137,23 +147,11 @@ func (wp *WorkerPool) startWorker() {
wp.started--
}
func (wp *WorkerPool) sha1Task(filePath string) error {
f, err := wp.deps.FS().GetFileReader(filePath)
if err != nil {
return fmt.Errorf("fail to get reader: %s", err)
}
h := sha1.New()
buf := make([]byte, 4096)
_, err = io.CopyBuffer(h, f, buf)
if err != nil {
return err
}
sha1Sign := fmt.Sprintf("% x", h.Sum(nil))
err = wp.deps.FileInfos().SetSha1(filePath, sha1Sign)
if err != nil {
return fmt.Errorf("fail to set sha1: %s", err)
}
return nil
func (wp *WorkerPool) AddHandler(msgType string, handler worker.MsgHandler) {
// existing task type will be overwritten
wp.msgHandlers[msgType] = handler
}
func (wp *WorkerPool) DelHandler(msgType string) {
delete(wp.msgHandlers, msgType)
}