feat(users) add async handler for reset used space

This commit is contained in:
hexxa 2022-03-09 15:29:38 +08:00 committed by Hexxa
parent e05ef9b5fd
commit 4dcd2c56ca
8 changed files with 142 additions and 4 deletions

View file

@ -67,6 +67,7 @@ type IUserStore interface {
SetInfo(id uint64, user *db.User) error SetInfo(id uint64, user *db.User) error
CanIncrUsed(id uint64, capacity int64) (bool, error) CanIncrUsed(id uint64, capacity int64) (bool, error)
SetUsed(id uint64, incr bool, capacity int64) error SetUsed(id uint64, incr bool, capacity int64) error
ResetUsed(id uint64, used int64) error
SetPwd(id uint64, pwd string) error SetPwd(id uint64, pwd string) error
SetPreferences(id uint64, settings *db.Preferences) error SetPreferences(id uint64, settings *db.Preferences) error
ListUsers() ([]*db.User, error) ListUsers() ([]*db.User, error)
@ -369,6 +370,32 @@ func (us *KVUserStore) SetUsed(id uint64, incr bool, capacity int64) error {
return us.store.SetStringIn(db.UsersNs, userID, string(infoBytes)) return us.store.SetStringIn(db.UsersNs, userID, string(infoBytes))
} }
func (us *KVUserStore) ResetUsed(id uint64, used int64) error {
us.mtx.Lock()
defer us.mtx.Unlock()
userID := fmt.Sprint(id)
infoStr, ok := us.store.GetStringIn(db.UsersNs, userID)
if !ok {
return fmt.Errorf("user (%d) does not exist", id)
}
gotUser := &db.User{}
err := json.Unmarshal([]byte(infoStr), gotUser)
if err != nil {
return err
} else if gotUser.ID != id {
return fmt.Errorf("user id key(%d) info(%d) does match", id, gotUser.ID)
}
gotUser.UsedSpace = used
infoBytes, err := json.Marshal(gotUser)
if err != nil {
return err
}
return us.store.SetStringIn(db.UsersNs, userID, string(infoBytes))
}
func (us *KVUserStore) SetInfo(id uint64, user *db.User) error { func (us *KVUserStore) SetInfo(id uint64, user *db.User) error {
us.mtx.Lock() us.mtx.Lock()
defer us.mtx.Unlock() defer us.mtx.Unlock()

View file

@ -9,7 +9,9 @@ import (
"github.com/ihexxa/quickshare/src/worker" "github.com/ihexxa/quickshare/src/worker"
) )
const MsgTypeSha1 = "sha1" const (
MsgTypeSha1 = "sha1"
)
type Sha1Params struct { type Sha1Params struct {
FilePath string FilePath string

View file

@ -0,0 +1,48 @@
package multiusers
import (
"encoding/json"
"fmt"
"path/filepath"
"github.com/ihexxa/quickshare/src/worker"
)
const (
MsgTypeResetUsedSpace = "reset-used-space"
)
type UsedSpaceParams struct {
UserID uint64
UserHomePath string
}
func (h *MultiUsersSvc) resetUsedSpace(msg worker.IMsg) error {
params := &UsedSpaceParams{}
err := json.Unmarshal([]byte(msg.Body()), params)
if err != nil {
return fmt.Errorf("fail to unmarshal sha1 msg: %w", err)
}
usedSpace := int64(0)
dirQueue := []string{params.UserHomePath}
for len(dirQueue) > 0 {
dirPath := dirQueue[0]
dirQueue = dirQueue[1:]
infos, err := h.deps.FS().ListDir(dirPath)
if err != nil {
return err
}
for _, info := range infos {
if info.IsDir() {
dirQueue = append(dirQueue, filepath.Join(dirPath, info.Name()))
} else {
usedSpace += info.Size()
}
}
}
return h.deps.Users().ResetUsed(params.UserID, usedSpace)
}

View file

@ -1,6 +1,7 @@
package multiusers package multiusers
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"path/filepath" "path/filepath"
@ -16,6 +17,7 @@ import (
"github.com/ihexxa/quickshare/src/db/userstore" "github.com/ihexxa/quickshare/src/db/userstore"
"github.com/ihexxa/quickshare/src/depidx" "github.com/ihexxa/quickshare/src/depidx"
q "github.com/ihexxa/quickshare/src/handlers" q "github.com/ihexxa/quickshare/src/handlers"
"github.com/ihexxa/quickshare/src/worker/localworker"
) )
var ( var (
@ -48,6 +50,7 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error)
apiRuleCname(userstore.AdminRole, "GET", "/v1/users/list"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/users/list"): true,
apiRuleCname(userstore.AdminRole, "GET", "/v1/users/self"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/users/self"): true,
apiRuleCname(userstore.AdminRole, "PATCH", "/v1/users/preferences"): true, apiRuleCname(userstore.AdminRole, "PATCH", "/v1/users/preferences"): true,
apiRuleCname(userstore.AdminRole, "PUT", "/v1/users//used-space"): true,
apiRuleCname(userstore.AdminRole, "POST", "/v1/roles/"): true, apiRuleCname(userstore.AdminRole, "POST", "/v1/roles/"): true,
apiRuleCname(userstore.AdminRole, "DELETE", "/v1/roles/"): true, apiRuleCname(userstore.AdminRole, "DELETE", "/v1/roles/"): true,
apiRuleCname(userstore.AdminRole, "GET", "/v1/roles/list"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/roles/list"): true,
@ -77,6 +80,7 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error)
apiRuleCname(userstore.AdminRole, "GET", "/v1/fs/sharings/dirs"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/fs/sharings/dirs"): true,
apiRuleCname(userstore.AdminRole, "GET", "/v1/fs/sharings/ids"): true, apiRuleCname(userstore.AdminRole, "GET", "/v1/fs/sharings/ids"): true,
apiRuleCname(userstore.AdminRole, "POST", "/v1/fs/hashes/sha1"): true, apiRuleCname(userstore.AdminRole, "POST", "/v1/fs/hashes/sha1"): true,
// user rules // user rules
apiRuleCname(userstore.UserRole, "GET", "/"): true, apiRuleCname(userstore.UserRole, "GET", "/"): true,
apiRuleCname(userstore.UserRole, "GET", publicPath): true, apiRuleCname(userstore.UserRole, "GET", publicPath): true,
@ -125,11 +129,14 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error)
apiRuleCname(userstore.VisitorRole, "GET", "/v1/fs/sharings/dirs"): true, apiRuleCname(userstore.VisitorRole, "GET", "/v1/fs/sharings/dirs"): true,
} }
return &MultiUsersSvc{ handlers := &MultiUsersSvc{
cfg: cfg, cfg: cfg,
deps: deps, deps: deps,
apiACRules: apiACRules, apiACRules: apiACRules,
}, nil }
deps.Workers().AddHandler(MsgTypeResetUsedSpace, handlers.resetUsedSpace)
return handlers, nil
} }
func (h *MultiUsersSvc) Init(adminName, adminPwd string) (string, error) { func (h *MultiUsersSvc) Init(adminName, adminPwd string) (string, error) {
@ -707,3 +714,43 @@ func (h *MultiUsersSvc) SetPreferences(c *gin.Context) {
} }
c.JSON(q.Resp(200)) c.JSON(q.Resp(200))
} }
type ResetUsedSpaceReq struct {
UserID uint64 `json:"userID,string"`
}
func (h *MultiUsersSvc) ResetUsedSpace(c *gin.Context) {
req := &ResetUsedSpaceReq{}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(q.ErrResp(c, 400, err))
return
}
userInfo, err := h.deps.Users().GetUser(req.UserID)
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
msg, err := json.Marshal(UsedSpaceParams{
UserID: req.UserID,
UserHomePath: userInfo.Name,
})
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
err = h.deps.Workers().TryPut(
localworker.NewMsg(
h.deps.ID().Gen(),
map[string]string{localworker.MsgTypeKey: MsgTypeResetUsedSpace},
string(msg),
),
)
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
c.JSON(q.Resp(200))
}

View file

@ -275,6 +275,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.E
usersAPI.GET("/self", userHdrs.Self) usersAPI.GET("/self", userHdrs.Self)
usersAPI.PATCH("/", userHdrs.SetUser) usersAPI.PATCH("/", userHdrs.SetUser)
usersAPI.PATCH("/preferences", userHdrs.SetPreferences) usersAPI.PATCH("/preferences", userHdrs.SetPreferences)
usersAPI.PUT("/used-space", userHdrs.ResetUsedSpace)
rolesAPI := v1.Group("/roles") rolesAPI := v1.Group("/roles")
rolesAPI.POST("/", userHdrs.AddRole) rolesAPI.POST("/", userHdrs.AddRole)

Binary file not shown.

View file

@ -2,7 +2,10 @@ package worker
import "errors" import "errors"
var ErrFull = errors.New("worker queue is full, make it larger in the config.") var (
ErrFull = errors.New("worker queue is full, make it larger in the config")
ErrClosed = errors.New("async handlers are closed")
)
func IsErrFull(err error) bool { func IsErrFull(err error) bool {
return err == ErrFull return err == ErrFull

View file

@ -10,6 +10,8 @@ import (
"github.com/ihexxa/quickshare/src/worker" "github.com/ihexxa/quickshare/src/worker"
) )
// TODO: support context
const ( const (
MsgTypeKey = "msg-type" MsgTypeKey = "msg-type"
) )
@ -42,6 +44,7 @@ func (m *Msg) Body() string {
type WorkerPool struct { type WorkerPool struct {
on bool on bool
listening bool
queue chan worker.IMsg queue chan worker.IMsg
sleep int sleep int
workerCount int workerCount int
@ -54,6 +57,7 @@ type WorkerPool struct {
func NewWorkerPool(queueSize, sleep, workerCount int, logger *zap.SugaredLogger) *WorkerPool { func NewWorkerPool(queueSize, sleep, workerCount int, logger *zap.SugaredLogger) *WorkerPool {
return &WorkerPool{ return &WorkerPool{
on: true, on: true,
listening: true,
logger: logger, logger: logger,
mtx: &sync.RWMutex{}, mtx: &sync.RWMutex{},
sleep: sleep, sleep: sleep,
@ -68,6 +72,9 @@ func (wp *WorkerPool) TryPut(task worker.IMsg) error {
wp.mtx.Lock() wp.mtx.Lock()
defer wp.mtx.Unlock() defer wp.mtx.Unlock()
if !wp.listening {
return worker.ErrClosed
}
if len(wp.queue) == cap(wp.queue) { if len(wp.queue) == cap(wp.queue) {
return worker.ErrFull return worker.ErrFull
} }
@ -80,6 +87,7 @@ func (wp *WorkerPool) Start() {
defer wp.mtx.Unlock() defer wp.mtx.Unlock()
wp.on = true wp.on = true
wp.listening = true
for wp.started < wp.workerCount { for wp.started < wp.workerCount {
go wp.startWorker() go wp.startWorker()
wp.started++ wp.started++
@ -90,6 +98,8 @@ func (wp *WorkerPool) Stop() {
wp.mtx.Lock() wp.mtx.Lock()
defer wp.mtx.Unlock() defer wp.mtx.Unlock()
wp.listening = false
// TODO: avoid sending and panic // TODO: avoid sending and panic
for len(wp.queue) > 0 { for len(wp.queue) > 0 {
wp.logger.Infof( wp.logger.Infof(