diff --git a/src/db/userstore/user_store.go b/src/db/userstore/user_store.go index d2292bc..e33619c 100644 --- a/src/db/userstore/user_store.go +++ b/src/db/userstore/user_store.go @@ -67,6 +67,7 @@ type IUserStore interface { SetInfo(id uint64, user *db.User) error CanIncrUsed(id uint64, capacity int64) (bool, error) SetUsed(id uint64, incr bool, capacity int64) error + ResetUsed(id uint64, used int64) error SetPwd(id uint64, pwd string) error SetPreferences(id uint64, settings *db.Preferences) error ListUsers() ([]*db.User, error) @@ -369,6 +370,32 @@ func (us *KVUserStore) SetUsed(id uint64, incr bool, capacity int64) error { return us.store.SetStringIn(db.UsersNs, userID, string(infoBytes)) } +func (us *KVUserStore) ResetUsed(id uint64, used int64) error { + us.mtx.Lock() + defer us.mtx.Unlock() + + userID := fmt.Sprint(id) + infoStr, ok := us.store.GetStringIn(db.UsersNs, userID) + if !ok { + return fmt.Errorf("user (%d) does not exist", id) + } + + gotUser := &db.User{} + err := json.Unmarshal([]byte(infoStr), gotUser) + if err != nil { + return err + } else if gotUser.ID != id { + return fmt.Errorf("user id key(%d) info(%d) does match", id, gotUser.ID) + } + + gotUser.UsedSpace = used + infoBytes, err := json.Marshal(gotUser) + if err != nil { + return err + } + return us.store.SetStringIn(db.UsersNs, userID, string(infoBytes)) +} + func (us *KVUserStore) SetInfo(id uint64, user *db.User) error { us.mtx.Lock() defer us.mtx.Unlock() diff --git a/src/handlers/fileshdr/async_handlers.go b/src/handlers/fileshdr/async_handlers.go index a000640..6a6ae59 100644 --- a/src/handlers/fileshdr/async_handlers.go +++ b/src/handlers/fileshdr/async_handlers.go @@ -9,7 +9,9 @@ import ( "github.com/ihexxa/quickshare/src/worker" ) -const MsgTypeSha1 = "sha1" +const ( + MsgTypeSha1 = "sha1" +) type Sha1Params struct { FilePath string diff --git a/src/handlers/multiusers/async_handlers.go b/src/handlers/multiusers/async_handlers.go new file mode 100644 index 0000000..9829b3b --- /dev/null +++ b/src/handlers/multiusers/async_handlers.go @@ -0,0 +1,48 @@ +package multiusers + +import ( + "encoding/json" + "fmt" + "path/filepath" + + "github.com/ihexxa/quickshare/src/worker" +) + +const ( + MsgTypeResetUsedSpace = "reset-used-space" +) + +type UsedSpaceParams struct { + UserID uint64 + UserHomePath string +} + +func (h *MultiUsersSvc) resetUsedSpace(msg worker.IMsg) error { + params := &UsedSpaceParams{} + err := json.Unmarshal([]byte(msg.Body()), params) + if err != nil { + return fmt.Errorf("fail to unmarshal sha1 msg: %w", err) + } + + usedSpace := int64(0) + dirQueue := []string{params.UserHomePath} + for len(dirQueue) > 0 { + dirPath := dirQueue[0] + dirQueue = dirQueue[1:] + + infos, err := h.deps.FS().ListDir(dirPath) + if err != nil { + return err + } + + for _, info := range infos { + if info.IsDir() { + dirQueue = append(dirQueue, filepath.Join(dirPath, info.Name())) + } else { + usedSpace += info.Size() + } + } + } + + return h.deps.Users().ResetUsed(params.UserID, usedSpace) +} diff --git a/src/handlers/multiusers/handlers.go b/src/handlers/multiusers/handlers.go index d699d48..61e4a47 100644 --- a/src/handlers/multiusers/handlers.go +++ b/src/handlers/multiusers/handlers.go @@ -1,6 +1,7 @@ package multiusers import ( + "encoding/json" "errors" "fmt" "path/filepath" @@ -16,6 +17,7 @@ import ( "github.com/ihexxa/quickshare/src/db/userstore" "github.com/ihexxa/quickshare/src/depidx" q "github.com/ihexxa/quickshare/src/handlers" + "github.com/ihexxa/quickshare/src/worker/localworker" ) var ( @@ -48,6 +50,7 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error) apiRuleCname(userstore.AdminRole, "GET", "/v1/users/list"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/users/self"): true, apiRuleCname(userstore.AdminRole, "PATCH", "/v1/users/preferences"): true, + apiRuleCname(userstore.AdminRole, "PUT", "/v1/users//used-space"): true, apiRuleCname(userstore.AdminRole, "POST", "/v1/roles/"): true, apiRuleCname(userstore.AdminRole, "DELETE", "/v1/roles/"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/roles/list"): true, @@ -77,6 +80,7 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error) apiRuleCname(userstore.AdminRole, "GET", "/v1/fs/sharings/dirs"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/fs/sharings/ids"): true, apiRuleCname(userstore.AdminRole, "POST", "/v1/fs/hashes/sha1"): true, + // user rules apiRuleCname(userstore.UserRole, "GET", "/"): true, apiRuleCname(userstore.UserRole, "GET", publicPath): true, @@ -125,11 +129,14 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error) apiRuleCname(userstore.VisitorRole, "GET", "/v1/fs/sharings/dirs"): true, } - return &MultiUsersSvc{ + handlers := &MultiUsersSvc{ cfg: cfg, deps: deps, apiACRules: apiACRules, - }, nil + } + deps.Workers().AddHandler(MsgTypeResetUsedSpace, handlers.resetUsedSpace) + + return handlers, nil } func (h *MultiUsersSvc) Init(adminName, adminPwd string) (string, error) { @@ -707,3 +714,43 @@ func (h *MultiUsersSvc) SetPreferences(c *gin.Context) { } c.JSON(q.Resp(200)) } + +type ResetUsedSpaceReq struct { + UserID uint64 `json:"userID,string"` +} + +func (h *MultiUsersSvc) ResetUsedSpace(c *gin.Context) { + req := &ResetUsedSpaceReq{} + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(q.ErrResp(c, 400, err)) + return + } + + userInfo, err := h.deps.Users().GetUser(req.UserID) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } + msg, err := json.Marshal(UsedSpaceParams{ + UserID: req.UserID, + UserHomePath: userInfo.Name, + }) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } + + err = h.deps.Workers().TryPut( + localworker.NewMsg( + h.deps.ID().Gen(), + map[string]string{localworker.MsgTypeKey: MsgTypeResetUsedSpace}, + string(msg), + ), + ) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } + + c.JSON(q.Resp(200)) +} diff --git a/src/server/server.go b/src/server/server.go index 0e52849..98a59b7 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -275,6 +275,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.E usersAPI.GET("/self", userHdrs.Self) usersAPI.PATCH("/", userHdrs.SetUser) usersAPI.PATCH("/preferences", userHdrs.SetPreferences) + usersAPI.PUT("/used-space", userHdrs.ResetUsedSpace) rolesAPI := v1.Group("/roles") rolesAPI.POST("/", userHdrs.AddRole) diff --git a/src/server/testdata/test_quickshare.db b/src/server/testdata/test_quickshare.db index 32e6082..1d9535b 100644 Binary files a/src/server/testdata/test_quickshare.db and b/src/server/testdata/test_quickshare.db differ diff --git a/src/worker/interface.go b/src/worker/interface.go index 5f0826b..f060b60 100644 --- a/src/worker/interface.go +++ b/src/worker/interface.go @@ -2,7 +2,10 @@ package worker import "errors" -var ErrFull = errors.New("worker queue is full, make it larger in the config.") +var ( + ErrFull = errors.New("worker queue is full, make it larger in the config") + ErrClosed = errors.New("async handlers are closed") +) func IsErrFull(err error) bool { return err == ErrFull diff --git a/src/worker/localworker/worker.go b/src/worker/localworker/worker.go index cdadade..f4237bf 100644 --- a/src/worker/localworker/worker.go +++ b/src/worker/localworker/worker.go @@ -10,6 +10,8 @@ import ( "github.com/ihexxa/quickshare/src/worker" ) +// TODO: support context + const ( MsgTypeKey = "msg-type" ) @@ -42,6 +44,7 @@ func (m *Msg) Body() string { type WorkerPool struct { on bool + listening bool queue chan worker.IMsg sleep int workerCount int @@ -54,6 +57,7 @@ type WorkerPool struct { func NewWorkerPool(queueSize, sleep, workerCount int, logger *zap.SugaredLogger) *WorkerPool { return &WorkerPool{ on: true, + listening: true, logger: logger, mtx: &sync.RWMutex{}, sleep: sleep, @@ -68,6 +72,9 @@ func (wp *WorkerPool) TryPut(task worker.IMsg) error { wp.mtx.Lock() defer wp.mtx.Unlock() + if !wp.listening { + return worker.ErrClosed + } if len(wp.queue) == cap(wp.queue) { return worker.ErrFull } @@ -80,6 +87,7 @@ func (wp *WorkerPool) Start() { defer wp.mtx.Unlock() wp.on = true + wp.listening = true for wp.started < wp.workerCount { go wp.startWorker() wp.started++ @@ -90,6 +98,8 @@ func (wp *WorkerPool) Stop() { wp.mtx.Lock() defer wp.mtx.Unlock() + wp.listening = false + // TODO: avoid sending and panic for len(wp.queue) > 0 { wp.logger.Infof(