diff --git a/configs/lan.yml b/configs/lan.yml new file mode 100644 index 0000000..88eafb5 --- /dev/null +++ b/configs/lan.yml @@ -0,0 +1,31 @@ +fs: + root: "tmp" + opensLimit: 128 + openTTL: 60 # 1 min +secrets: + tokenSecret: "" +server: + debug: true + host: "0.0.0.0" + port: 8686 + readTimeout: 2000 + writerTimeout: 86400000 # 1 day + maxHeaderBytes: 512 + publicPath: "public" + captchaWidth: 256 + captchaHeight: 60 + captchaEnabled: true + uploadSpeedLimit: 409600 # 400KB/limiterCyc + downloadSpeedLimit: 409600 # 400KB/limiterCyc + spaceLimit: 104857600 # 100MB + limiterCapacity: 1000 + limiterCyc: 1000 # 1s +users: + enableAuth: true + defaultAdmin: "" + defaultAdminPwd: "" + cookieTTL: 604800 # 1 week + cookieSecure: false + cookieHttpOnly: true + minUserNameLen: 2 + minPwdLen: 4 diff --git a/src/fileinfostore/file_info_store.go b/src/fileinfostore/file_info_store.go index 093f295..681a546 100644 --- a/src/fileinfostore/file_info_store.go +++ b/src/fileinfostore/file_info_store.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" "github.com/ihexxa/quickshare/src/kvstore" @@ -37,9 +38,11 @@ type IFileInfoStore interface { GetInfo(itemPath string) (*FileInfo, error) SetInfo(itemPath string, info *FileInfo) error DelInfo(itemPath string) error + SetSha1(itemPath, sign string) error } type FileInfoStore struct { + mtx *sync.RWMutex store kvstore.IKVStore } @@ -64,10 +67,14 @@ func NewFileInfoStore(store kvstore.IKVStore) (*FileInfoStore, error) { return &FileInfoStore{ store: store, + mtx: &sync.RWMutex{}, }, nil } func (fi *FileInfoStore) AddSharing(dirPath string) error { + fi.mtx.Lock() + defer fi.mtx.Unlock() + info, err := fi.GetInfo(dirPath) if err != nil { if !IsNotFound(err) { @@ -82,6 +89,9 @@ func (fi *FileInfoStore) AddSharing(dirPath string) error { } func (fi *FileInfoStore) DelSharing(dirPath string) error { + fi.mtx.Lock() + defer fi.mtx.Unlock() + info, err := fi.GetInfo(dirPath) if err != nil { return err @@ -91,6 +101,7 @@ func (fi *FileInfoStore) DelSharing(dirPath string) error { } func (fi *FileInfoStore) GetSharing(dirPath string) (bool, bool) { + // TODO: add lock info, err := fi.GetInfo(dirPath) if err != nil { // TODO: error is ignored @@ -150,3 +161,21 @@ func (fi *FileInfoStore) SetInfo(itemPath string, info *FileInfo) error { func (fi *FileInfoStore) DelInfo(itemPath string) error { return fi.store.DelStringIn(InfoNs, itemPath) } + +func (fi *FileInfoStore) SetSha1(itemPath, sign string) error { + fi.mtx.Lock() + defer fi.mtx.Unlock() + + info, err := fi.GetInfo(itemPath) + if err != nil { + if !IsNotFound(err) { + return err + } + info = &FileInfo{ + IsDir: false, + Shared: false, + } + } + info.Sha1 = sign + return fi.SetInfo(itemPath, info) +} diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 4631fd2..97f1c04 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -20,6 +20,8 @@ import ( "github.com/ihexxa/quickshare/src/depidx" q "github.com/ihexxa/quickshare/src/handlers" "github.com/ihexxa/quickshare/src/userstore" + "github.com/ihexxa/quickshare/src/worker" + "github.com/ihexxa/quickshare/src/worker/localworker" ) var ( @@ -39,13 +41,15 @@ type FileHandlers struct { cfg gocfg.ICfg deps *depidx.Deps uploadMgr *UploadMgr + workers worker.IWorkerPool } -func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps) (*FileHandlers, error) { +func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps, workers worker.IWorkerPool) (*FileHandlers, error) { return &FileHandlers{ cfg: cfg, deps: deps, uploadMgr: NewUploadMgr(deps.KV()), + workers: workers, }, nil } @@ -410,6 +414,18 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) { c.JSON(q.ErrResp(c, 500, err)) return } + + err = h.workers.TryPut( + localworker.NewMsg( + h.deps.ID().Gen(), + map[string]string{localworker.MsgTypeKey: "sha1"}, + fsFilePath, + ), + ) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } } c.JSON(200, &UploadStatusResp{ diff --git a/src/handlers/multiusers/handlers.go b/src/handlers/multiusers/handlers.go index c077827..ff46dc9 100644 --- a/src/handlers/multiusers/handlers.go +++ b/src/handlers/multiusers/handlers.go @@ -239,13 +239,13 @@ func (h *MultiUsersSvc) SetPwd(c *gin.Context) { } user, err := h.deps.Users().GetUser(uid) if err != nil { - c.JSON(q.ErrResp(c, 401, err)) + c.JSON(q.ErrResp(c, 402, err)) return } err = bcrypt.CompareHashAndPassword([]byte(user.Pwd), []byte(req.OldPwd)) if err != nil { - c.JSON(q.ErrResp(c, 401, ErrInvalidUser)) + c.JSON(q.ErrResp(c, 403, ErrInvalidUser)) return } diff --git a/src/server/server.go b/src/server/server.go index c2c7f3e..f5bc8dd 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -32,12 +32,15 @@ import ( "github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/kvstore/boltdbpvd" "github.com/ihexxa/quickshare/src/userstore" + "github.com/ihexxa/quickshare/src/worker" + "github.com/ihexxa/quickshare/src/worker/localworker" ) type Server struct { - server *http.Server - cfg gocfg.ICfg - deps *depidx.Deps + server *http.Server + cfg gocfg.ICfg + deps *depidx.Deps + workers worker.IWorkerPool } func NewServer(cfg gocfg.ICfg) (*Server, error) { @@ -128,6 +131,10 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps { } func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.Engine, error) { + // workers + workers := localworker.NewWorkerPool(1024, 5000, 2, deps) + + // handlers userHdrs, err := multiusers.NewMultiUsersSvc(cfg, deps) if err != nil { return nil, err @@ -161,7 +168,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.E deps.Log().Infof("user (%s) is created\n", adminName) } - fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps) + fileHdrs, err := fileshdr.NewFileHandlers(cfg, deps, workers) if err != nil { return nil, err } diff --git a/src/worker/interface.go b/src/worker/interface.go new file mode 100644 index 0000000..b0ff930 --- /dev/null +++ b/src/worker/interface.go @@ -0,0 +1,21 @@ +package worker + +import "errors" + +var ErrFull = errors.New("worker queue is full, make it larger in the config.") + +func IsErrFull(err error) bool { + return err == ErrFull +} + +type IMsg interface { + ID() uint64 + Headers() map[string]string + Body() string +} + +type IWorkerPool interface { + TryPut(task IMsg) error + Start() + Stop() +} diff --git a/src/worker/localworker/worker.go b/src/worker/localworker/worker.go new file mode 100644 index 0000000..9ba3594 --- /dev/null +++ b/src/worker/localworker/worker.go @@ -0,0 +1,154 @@ +package localworker + +import ( + "crypto/sha1" + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/ihexxa/quickshare/src/depidx" + "github.com/ihexxa/quickshare/src/worker" +) + +const ( + MsgTypeKey = "msg-type" +) + +type WorkerPool struct { + on bool + queue chan worker.IMsg + sleep int + workerCount int + started int + mtx *sync.RWMutex + deps *depidx.Deps +} + +type Msg struct { + id uint64 + headers map[string]string + body string +} + +func NewMsg(id uint64, headers map[string]string, body string) *Msg { + return &Msg{ + id: id, + headers: headers, + body: body, + } +} + +func (m *Msg) ID() uint64 { + return m.id +} + +func (m *Msg) Headers() map[string]string { + return m.headers +} + +func (m *Msg) Body() string { + return m.body +} + +func NewWorkerPool(queueSize, sleep, workerCount int, deps *depidx.Deps) *WorkerPool { + return &WorkerPool{ + on: true, + deps: deps, + mtx: &sync.RWMutex{}, + sleep: sleep, + workerCount: workerCount, + queue: make(chan worker.IMsg, queueSize), + } +} + +func (wp *WorkerPool) TryPut(task worker.IMsg) error { + // this close the window that queue can be full after checking + wp.mtx.Lock() + defer wp.mtx.Unlock() + + if len(wp.queue) == cap(wp.queue) { + return worker.ErrFull + } + wp.queue <- task + return nil +} + +type Sha1Params struct { + FilePath string +} + +func (wp *WorkerPool) Start() { + wp.on = true + for wp.started < wp.workerCount { + go wp.startWorker() + wp.started++ + } +} + +func (wp *WorkerPool) Stop() { + wp.on = false +} + +func (wp *WorkerPool) startWorker() { + var err error + + // TODO: make it stateful + for wp.on { + func() { + defer func() { + if p := recover(); p != nil { + wp.deps.Log().Errorf("worker panic: %s", p) + } + }() + + msg := <-wp.queue + headers := msg.Headers() + msgType, ok := headers[MsgTypeKey] + if !ok { + wp.deps.Log().Errorf("msg type not found: %v", headers) + } + + switch msgType { + case "sha1": + sha1Params := &Sha1Params{} + err = json.Unmarshal([]byte(msg.Body()), sha1Params) + if err != nil { + wp.deps.Log().Errorf("fail to unmarshal sha1 msg: %s", err) + } + err = wp.sha1Task(sha1Params.FilePath) + if err != nil { + wp.deps.Log().Errorf("fail to do sha1: %s", err) + } + default: + wp.deps.Log().Errorf("unknown message tyope: %s", msgType) + } + }() + + time.Sleep(time.Duration(wp.sleep) * time.Second) + } + + wp.started-- +} + +func (wp *WorkerPool) sha1Task(filePath string) error { + f, err := wp.deps.FS().GetFileReader(filePath) + if err != nil { + return fmt.Errorf("fail to get reader: %s", err) + } + + h := sha1.New() + buf := make([]byte, 4096) + _, err = io.CopyBuffer(h, f, buf) + if err != nil { + return err + } + + sha1Sign := fmt.Sprintf("% x", h.Sum(nil)) + err = wp.deps.FileInfos().SetSha1(filePath, sha1Sign) + if err != nil { + return fmt.Errorf("fail to set sha1: %s", err) + } + return nil +} diff --git a/src/worker/simple_worker.go b/src/worker/simple_worker.go deleted file mode 100644 index 6560b80..0000000 --- a/src/worker/simple_worker.go +++ /dev/null @@ -1,106 +0,0 @@ -package worker - -import ( - "crypto/sha1" - "encoding/json" - "errors" - "fmt" - "io" - "sync" - - "github.com/ihexxa/quickshare/src/depidx" -) - -const ( - MsgTypeKey = "msg-type" -) - -type Msg interface { - ID() uint64 - Headers() map[string]string - Body() string -} - -var ErrFull = errors.New("worker queue is full, make it larger in the config.") - -func IsErrFull(err error) bool { - return err == ErrFull -} - -type WorkerPool struct { - on bool - queue chan Msg - mtx *sync.RWMutex - deps *depidx.Deps -} - -func NewWorkerPool(queueSize int, deps *depidx.Deps) *WorkerPool { - return &WorkerPool{ - on: true, - deps: deps, - mtx: &sync.RWMutex{}, - queue: make(chan Msg, queueSize), - } -} - -func (wp *WorkerPool) TryPut(task Msg) error { - // this close the window that queue can be full after checking - wp.mtx.Lock() - defer wp.mtx.Unlock() - - if len(wp.queue) == cap(wp.queue) { - return ErrFull - } - wp.queue <- task - return nil -} - -type Sha1Params struct { - FilePath string -} - -func (wp *WorkerPool) startWorker() { - var err error - - for wp.on { - msg := <-wp.queue - headers := msg.Headers() - msgType, ok := headers[MsgTypeKey] - if !ok { - wp.deps.Log().Errorf("msg type not found: %v", headers) - } - - switch msgType { - case "sha1": - sha1Params := &Sha1Params{} - err = json.Unmarshal([]byte(msg.Body()), sha1Params) - if err != nil { - wp.deps.Log().Errorf("fail to unmarshal sha1 msg: %s", err) - } - err = wp.sha1Task(sha1Params.FilePath) - if err != nil { - wp.deps.Log().Errorf("fail to do sha1: %s", err) - } - default: - wp.deps.Log().Errorf("unknown message tyope: %s", msgType) - } - } -} - -func (wp *WorkerPool) sha1Task(filePath string) error { - f, err := wp.deps.FS().GetFileReader(filePath) - if err != nil { - return fmt.Errorf("fail to get reader: %s", err) - } - - h := sha1.New() - buf := make([]byte, 4096) - _, err = io.CopyBuffer(h, f, buf) - if err != nil { - return err - } - - // sha1Sign := fmt.Sprintf("% x", h.Sum(nil)) - // save it to db - return nil -}