From fd5da3db376ba8de170d958329eb24d5da6a3987 Mon Sep 17 00:00:00 2001 From: hexxa Date: Sat, 7 Aug 2021 21:06:43 +0800 Subject: [PATCH] feat(files): enable upload limiter --- src/depidx/deps.go | 10 +++++ src/handlers/fileshdr/handlers.go | 16 ++++++++ src/handlers/util.go | 21 ++++++++++ src/iolimiter/iolimiter.go | 65 +++++++++++++++++++++++++++++++ src/server/server.go | 6 +++ 5 files changed, 118 insertions(+) create mode 100644 src/iolimiter/iolimiter.go diff --git a/src/depidx/deps.go b/src/depidx/deps.go index 176ccb2..b15e2e9 100644 --- a/src/depidx/deps.go +++ b/src/depidx/deps.go @@ -7,6 +7,7 @@ import ( "github.com/ihexxa/quickshare/src/cryptoutil" "github.com/ihexxa/quickshare/src/fs" "github.com/ihexxa/quickshare/src/idgen" + "github.com/ihexxa/quickshare/src/iolimiter" "github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/userstore" ) @@ -27,6 +28,7 @@ type Deps struct { uploader IUploader id idgen.IIDGen logger *zap.SugaredLogger + limiter iolimiter.ILimiter } func NewDeps(cfg gocfg.ICfg) *Deps { @@ -80,3 +82,11 @@ func (deps *Deps) Users() userstore.IUserStore { func (deps *Deps) SetUsers(users userstore.IUserStore) { deps.users = users } + +func (deps *Deps) Limiter() iolimiter.ILimiter { + return deps.limiter +} + +func (deps *Deps) SetLimiter(limiter iolimiter.ILimiter) { + deps.limiter = limiter +} diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index d6bad36..dadb650 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -10,6 +10,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "time" @@ -289,6 +290,21 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) { return } + userIDInt, err := strconv.ParseUint(userID, 10, 64) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } + + ok, err := h.deps.Limiter().CanUpload(userIDInt, len([]byte(req.Content))) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } else if !ok { + c.JSON(q.ErrResp(c, 503, errors.New("retry later"))) + return + } + tmpFilePath := q.UploadPath(userID, req.Path) locker := h.NewAutoLocker(c, lockName(tmpFilePath)) locker.Exec(func() { diff --git a/src/handlers/util.go b/src/handlers/util.go index d2e1c55..d934833 100644 --- a/src/handlers/util.go +++ b/src/handlers/util.go @@ -7,6 +7,8 @@ import ( "path/filepath" "github.com/gin-gonic/gin" + + "github.com/ihexxa/quickshare/src/cryptoutil" ) var ( @@ -137,3 +139,22 @@ func UploadPath(userID, relFilePath string) string { func UploadFolder(userID string) string { return filepath.Join(userID, UploadDir) } + +func GetUserInfo(tokenStr string, tokenEncDec cryptoutil.ITokenEncDec) (map[string]string, error) { + claims, err := tokenEncDec.FromToken( + tokenStr, + map[string]string{ + UserIDParam: "", + UserParam: "", + RoleParam: "", + ExpireParam: "", + }, + ) + if err != nil { + return nil, err + } else if claims[UserIDParam] == "" || claims[UserParam] == "" { + return nil, errors.New("empty user id or name") + } + + return claims, nil +} diff --git a/src/iolimiter/iolimiter.go b/src/iolimiter/iolimiter.go new file mode 100644 index 0000000..3ca0988 --- /dev/null +++ b/src/iolimiter/iolimiter.go @@ -0,0 +1,65 @@ +package iolimiter + +import ( + "fmt" + "sync" + + "github.com/ihexxa/quickshare/src/golimiter" + "github.com/ihexxa/quickshare/src/userstore" +) + +const cacheSizeLimit = 1024 + +type ILimiter interface { + CanUpload(id uint64, chunkSize int) (bool, error) +} + +type IOLimiter struct { + mtx *sync.Mutex + UploadLimiter *golimiter.Limiter + users userstore.IUserStore + quotaCache map[uint64]*userstore.Quota +} + +func NewIOLimiter(upCap, upCyc int, users userstore.IUserStore) *IOLimiter { + return &IOLimiter{ + mtx: &sync.Mutex{}, + UploadLimiter: golimiter.New(upCap, upCyc), + users: users, + quotaCache: map[uint64]*userstore.Quota{}, + } +} + +func (lm *IOLimiter) CanUpload(id uint64, chunkSize int) (bool, error) { + lm.mtx.Lock() + defer lm.mtx.Unlock() + + quota, ok := lm.quotaCache[id] + if !ok { + user, err := lm.users.GetUser(id) + if err != nil { + return false, err + } + quota = user.Quota + lm.quotaCache[id] = quota + } + if len(lm.quotaCache) > cacheSizeLimit { + lm.clean() + } + + return lm.UploadLimiter.Access( + fmt.Sprint(id), + quota.UploadSpeedLimit, + chunkSize, + ), nil +} + +func (lm *IOLimiter) clean() { + count := 0 + for key := range lm.quotaCache { + delete(lm.quotaCache, key) + if count++; count > 5 { + break + } + } +} diff --git a/src/server/server.go b/src/server/server.go index 37975df..a20f39f 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -27,6 +27,7 @@ import ( "github.com/ihexxa/quickshare/src/handlers/multiusers" "github.com/ihexxa/quickshare/src/handlers/settings" "github.com/ihexxa/quickshare/src/idgen/simpleidgen" + "github.com/ihexxa/quickshare/src/iolimiter" "github.com/ihexxa/quickshare/src/kvstore" "github.com/ihexxa/quickshare/src/kvstore/boltdbpvd" "github.com/ihexxa/quickshare/src/userstore" @@ -104,6 +105,10 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps { panic(fmt.Sprintf("fail to init user store: %s", err)) } + limiterCap := cfg.IntOr("Users.LimiterCapacity", 4096) + limiterCyc := cfg.IntOr("Users.LimiterCyc", 3000) + limiter := iolimiter.NewIOLimiter(limiterCap, limiterCyc, users) + deps := depidx.NewDeps(cfg) deps.SetFS(filesystem) deps.SetToken(jwtEncDec) @@ -111,6 +116,7 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps { deps.SetUsers(users) deps.SetID(ider) deps.SetLog(logger) + deps.SetLimiter(limiter) return deps }