feat(workers): integrate workers to handlers

This commit is contained in:
hexxa 2021-09-05 22:26:55 +08:00 committed by Hexxa
parent 0e7f39b8cc
commit e47ee4aa8c
8 changed files with 265 additions and 113 deletions

31
configs/lan.yml Normal file
View file

@ -0,0 +1,31 @@
fs:
root: "tmp"
opensLimit: 128
openTTL: 60 # 1 min
secrets:
tokenSecret: ""
server:
debug: true
host: "0.0.0.0"
port: 8686
readTimeout: 2000
writerTimeout: 86400000 # 1 day
maxHeaderBytes: 512
publicPath: "public"
captchaWidth: 256
captchaHeight: 60
captchaEnabled: true
uploadSpeedLimit: 409600 # 400KB/limiterCyc
downloadSpeedLimit: 409600 # 400KB/limiterCyc
spaceLimit: 104857600 # 100MB
limiterCapacity: 1000
limiterCyc: 1000 # 1s
users:
enableAuth: true
defaultAdmin: ""
defaultAdminPwd: ""
cookieTTL: 604800 # 1 week
cookieSecure: false
cookieHttpOnly: true
minUserNameLen: 2
minPwdLen: 4

View file

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/kvstore"
@ -37,9 +38,11 @@ type IFileInfoStore interface {
GetInfo(itemPath string) (*FileInfo, error) GetInfo(itemPath string) (*FileInfo, error)
SetInfo(itemPath string, info *FileInfo) error SetInfo(itemPath string, info *FileInfo) error
DelInfo(itemPath string) error DelInfo(itemPath string) error
SetSha1(itemPath, sign string) error
} }
type FileInfoStore struct { type FileInfoStore struct {
mtx *sync.RWMutex
store kvstore.IKVStore store kvstore.IKVStore
} }
@ -64,10 +67,14 @@ func NewFileInfoStore(store kvstore.IKVStore) (*FileInfoStore, error) {
return &FileInfoStore{ return &FileInfoStore{
store: store, store: store,
mtx: &sync.RWMutex{},
}, nil }, nil
} }
func (fi *FileInfoStore) AddSharing(dirPath string) error { func (fi *FileInfoStore) AddSharing(dirPath string) error {
fi.mtx.Lock()
defer fi.mtx.Unlock()
info, err := fi.GetInfo(dirPath) info, err := fi.GetInfo(dirPath)
if err != nil { if err != nil {
if !IsNotFound(err) { if !IsNotFound(err) {
@ -82,6 +89,9 @@ func (fi *FileInfoStore) AddSharing(dirPath string) error {
} }
func (fi *FileInfoStore) DelSharing(dirPath string) error { func (fi *FileInfoStore) DelSharing(dirPath string) error {
fi.mtx.Lock()
defer fi.mtx.Unlock()
info, err := fi.GetInfo(dirPath) info, err := fi.GetInfo(dirPath)
if err != nil { if err != nil {
return err return err
@ -91,6 +101,7 @@ func (fi *FileInfoStore) DelSharing(dirPath string) error {
} }
func (fi *FileInfoStore) GetSharing(dirPath string) (bool, bool) { func (fi *FileInfoStore) GetSharing(dirPath string) (bool, bool) {
// TODO: add lock
info, err := fi.GetInfo(dirPath) info, err := fi.GetInfo(dirPath)
if err != nil { if err != nil {
// TODO: error is ignored // TODO: error is ignored
@ -150,3 +161,21 @@ func (fi *FileInfoStore) SetInfo(itemPath string, info *FileInfo) error {
func (fi *FileInfoStore) DelInfo(itemPath string) error { func (fi *FileInfoStore) DelInfo(itemPath string) error {
return fi.store.DelStringIn(InfoNs, itemPath) return fi.store.DelStringIn(InfoNs, itemPath)
} }
func (fi *FileInfoStore) SetSha1(itemPath, sign string) error {
fi.mtx.Lock()
defer fi.mtx.Unlock()
info, err := fi.GetInfo(itemPath)
if err != nil {
if !IsNotFound(err) {
return err
}
info = &FileInfo{
IsDir: false,
Shared: false,
}
}
info.Sha1 = sign
return fi.SetInfo(itemPath, info)
}

View file

@ -20,6 +20,8 @@ import (
"github.com/ihexxa/quickshare/src/depidx" "github.com/ihexxa/quickshare/src/depidx"
q "github.com/ihexxa/quickshare/src/handlers" q "github.com/ihexxa/quickshare/src/handlers"
"github.com/ihexxa/quickshare/src/userstore" "github.com/ihexxa/quickshare/src/userstore"
"github.com/ihexxa/quickshare/src/worker"
"github.com/ihexxa/quickshare/src/worker/localworker"
) )
var ( var (
@ -39,13 +41,15 @@ type FileHandlers struct {
cfg gocfg.ICfg cfg gocfg.ICfg
deps *depidx.Deps deps *depidx.Deps
uploadMgr *UploadMgr uploadMgr *UploadMgr
workers worker.IWorkerPool
} }
func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps) (*FileHandlers, error) { func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps, workers worker.IWorkerPool) (*FileHandlers, error) {
return &FileHandlers{ return &FileHandlers{
cfg: cfg, cfg: cfg,
deps: deps, deps: deps,
uploadMgr: NewUploadMgr(deps.KV()), uploadMgr: NewUploadMgr(deps.KV()),
workers: workers,
}, nil }, nil
} }
@ -410,6 +414,18 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
c.JSON(q.ErrResp(c, 500, err)) c.JSON(q.ErrResp(c, 500, err))
return return
} }
err = h.workers.TryPut(
localworker.NewMsg(
h.deps.ID().Gen(),
map[string]string{localworker.MsgTypeKey: "sha1"},
fsFilePath,
),
)
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
} }
c.JSON(200, &UploadStatusResp{ c.JSON(200, &UploadStatusResp{

View file

@ -239,13 +239,13 @@ func (h *MultiUsersSvc) SetPwd(c *gin.Context) {
} }
user, err := h.deps.Users().GetUser(uid) user, err := h.deps.Users().GetUser(uid)
if err != nil { if err != nil {
c.JSON(q.ErrResp(c, 401, err)) c.JSON(q.ErrResp(c, 402, err))
return return
} }
err = bcrypt.CompareHashAndPassword([]byte(user.Pwd), []byte(req.OldPwd)) err = bcrypt.CompareHashAndPassword([]byte(user.Pwd), []byte(req.OldPwd))
if err != nil { if err != nil {
c.JSON(q.ErrResp(c, 401, ErrInvalidUser)) c.JSON(q.ErrResp(c, 403, ErrInvalidUser))
return return
} }

View file

@ -32,12 +32,15 @@ import (
"github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/kvstore"
"github.com/ihexxa/quickshare/src/kvstore/boltdbpvd" "github.com/ihexxa/quickshare/src/kvstore/boltdbpvd"
"github.com/ihexxa/quickshare/src/userstore" "github.com/ihexxa/quickshare/src/userstore"
"github.com/ihexxa/quickshare/src/worker"
"github.com/ihexxa/quickshare/src/worker/localworker"
) )
type Server struct { type Server struct {
server *http.Server server *http.Server
cfg gocfg.ICfg cfg gocfg.ICfg
deps *depidx.Deps deps *depidx.Deps
workers worker.IWorkerPool
} }
func NewServer(cfg gocfg.ICfg) (*Server, error) { func NewServer(cfg gocfg.ICfg) (*Server, error) {
@ -128,6 +131,10 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps {
} }
func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.Engine, error) { func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.Engine, error) {
// workers
workers := localworker.NewWorkerPool(1024, 5000, 2, deps)
// handlers
userHdrs, err := multiusers.NewMultiUsersSvc(cfg, deps) userHdrs, err := multiusers.NewMultiUsersSvc(cfg, deps)
if err != nil { if err != nil {
return nil, err return nil, err
@ -161,7 +168,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.E
deps.Log().Infof("user (%s) is created\n", adminName) deps.Log().Infof("user (%s) is created\n", adminName)
} }
fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps) fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps, workers)
if err != nil { if err != nil {
return nil, err return nil, err
} }

21
src/worker/interface.go Normal file
View file

@ -0,0 +1,21 @@
package worker
import "errors"
var ErrFull = errors.New("worker queue is full, make it larger in the config.")
func IsErrFull(err error) bool {
return err == ErrFull
}
type IMsg interface {
ID() uint64
Headers() map[string]string
Body() string
}
type IWorkerPool interface {
TryPut(task IMsg) error
Start()
Stop()
}

View file

@ -0,0 +1,154 @@
package localworker
import (
"crypto/sha1"
"encoding/json"
"fmt"
"io"
"sync"
"time"
"github.com/ihexxa/quickshare/src/depidx"
"github.com/ihexxa/quickshare/src/worker"
)
const (
MsgTypeKey = "msg-type"
)
type WorkerPool struct {
on bool
queue chan worker.IMsg
sleep int
workerCount int
started int
mtx *sync.RWMutex
deps *depidx.Deps
}
type Msg struct {
id uint64
headers map[string]string
body string
}
func NewMsg(id uint64, headers map[string]string, body string) *Msg {
return &Msg{
id: id,
headers: headers,
body: body,
}
}
func (m *Msg) ID() uint64 {
return m.id
}
func (m *Msg) Headers() map[string]string {
return m.headers
}
func (m *Msg) Body() string {
return m.body
}
func NewWorkerPool(queueSize, sleep, workerCount int, deps *depidx.Deps) *WorkerPool {
return &WorkerPool{
on: true,
deps: deps,
mtx: &sync.RWMutex{},
sleep: sleep,
workerCount: workerCount,
queue: make(chan worker.IMsg, queueSize),
}
}
func (wp *WorkerPool) TryPut(task worker.IMsg) error {
// this close the window that queue can be full after checking
wp.mtx.Lock()
defer wp.mtx.Unlock()
if len(wp.queue) == cap(wp.queue) {
return worker.ErrFull
}
wp.queue <- task
return nil
}
type Sha1Params struct {
FilePath string
}
func (wp *WorkerPool) Start() {
wp.on = true
for wp.started < wp.workerCount {
go wp.startWorker()
wp.started++
}
}
func (wp *WorkerPool) Stop() {
wp.on = false
}
func (wp *WorkerPool) startWorker() {
var err error
// TODO: make it stateful
for wp.on {
func() {
defer func() {
if p := recover(); p != nil {
wp.deps.Log().Errorf("worker panic: %s", p)
}
}()
msg := <-wp.queue
headers := msg.Headers()
msgType, ok := headers[MsgTypeKey]
if !ok {
wp.deps.Log().Errorf("msg type not found: %v", headers)
}
switch msgType {
case "sha1":
sha1Params := &Sha1Params{}
err = json.Unmarshal([]byte(msg.Body()), sha1Params)
if err != nil {
wp.deps.Log().Errorf("fail to unmarshal sha1 msg: %s", err)
}
err = wp.sha1Task(sha1Params.FilePath)
if err != nil {
wp.deps.Log().Errorf("fail to do sha1: %s", err)
}
default:
wp.deps.Log().Errorf("unknown message tyope: %s", msgType)
}
}()
time.Sleep(time.Duration(wp.sleep) * time.Second)
}
wp.started--
}
func (wp *WorkerPool) sha1Task(filePath string) error {
f, err := wp.deps.FS().GetFileReader(filePath)
if err != nil {
return fmt.Errorf("fail to get reader: %s", err)
}
h := sha1.New()
buf := make([]byte, 4096)
_, err = io.CopyBuffer(h, f, buf)
if err != nil {
return err
}
sha1Sign := fmt.Sprintf("% x", h.Sum(nil))
err = wp.deps.FileInfos().SetSha1(filePath, sha1Sign)
if err != nil {
return fmt.Errorf("fail to set sha1: %s", err)
}
return nil
}

View file

@ -1,106 +0,0 @@
package worker
import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"github.com/ihexxa/quickshare/src/depidx"
)
const (
MsgTypeKey = "msg-type"
)
type Msg interface {
ID() uint64
Headers() map[string]string
Body() string
}
var ErrFull = errors.New("worker queue is full, make it larger in the config.")
func IsErrFull(err error) bool {
return err == ErrFull
}
type WorkerPool struct {
on bool
queue chan Msg
mtx *sync.RWMutex
deps *depidx.Deps
}
func NewWorkerPool(queueSize int, deps *depidx.Deps) *WorkerPool {
return &WorkerPool{
on: true,
deps: deps,
mtx: &sync.RWMutex{},
queue: make(chan Msg, queueSize),
}
}
func (wp *WorkerPool) TryPut(task Msg) error {
// this close the window that queue can be full after checking
wp.mtx.Lock()
defer wp.mtx.Unlock()
if len(wp.queue) == cap(wp.queue) {
return ErrFull
}
wp.queue <- task
return nil
}
type Sha1Params struct {
FilePath string
}
func (wp *WorkerPool) startWorker() {
var err error
for wp.on {
msg := <-wp.queue
headers := msg.Headers()
msgType, ok := headers[MsgTypeKey]
if !ok {
wp.deps.Log().Errorf("msg type not found: %v", headers)
}
switch msgType {
case "sha1":
sha1Params := &Sha1Params{}
err = json.Unmarshal([]byte(msg.Body()), sha1Params)
if err != nil {
wp.deps.Log().Errorf("fail to unmarshal sha1 msg: %s", err)
}
err = wp.sha1Task(sha1Params.FilePath)
if err != nil {
wp.deps.Log().Errorf("fail to do sha1: %s", err)
}
default:
wp.deps.Log().Errorf("unknown message tyope: %s", msgType)
}
}
}
func (wp *WorkerPool) sha1Task(filePath string) error {
f, err := wp.deps.FS().GetFileReader(filePath)
if err != nil {
return fmt.Errorf("fail to get reader: %s", err)
}
h := sha1.New()
buf := make([]byte, 4096)
_, err = io.CopyBuffer(h, f, buf)
if err != nil {
return err
}
// sha1Sign := fmt.Sprintf("% x", h.Sum(nil))
// save it to db
return nil
}