feat(files): enable upload limiter
This commit is contained in:
parent
e01f5f8351
commit
fd5da3db37
5 changed files with 118 additions and 0 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/ihexxa/quickshare/src/cryptoutil"
|
||||
"github.com/ihexxa/quickshare/src/fs"
|
||||
"github.com/ihexxa/quickshare/src/idgen"
|
||||
"github.com/ihexxa/quickshare/src/iolimiter"
|
||||
"github.com/ihexxa/quickshare/src/kvstore"
|
||||
"github.com/ihexxa/quickshare/src/userstore"
|
||||
)
|
||||
|
@ -27,6 +28,7 @@ type Deps struct {
|
|||
uploader IUploader
|
||||
id idgen.IIDGen
|
||||
logger *zap.SugaredLogger
|
||||
limiter iolimiter.ILimiter
|
||||
}
|
||||
|
||||
func NewDeps(cfg gocfg.ICfg) *Deps {
|
||||
|
@ -80,3 +82,11 @@ func (deps *Deps) Users() userstore.IUserStore {
|
|||
func (deps *Deps) SetUsers(users userstore.IUserStore) {
|
||||
deps.users = users
|
||||
}
|
||||
|
||||
func (deps *Deps) Limiter() iolimiter.ILimiter {
|
||||
return deps.limiter
|
||||
}
|
||||
|
||||
func (deps *Deps) SetLimiter(limiter iolimiter.ILimiter) {
|
||||
deps.limiter = limiter
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -289,6 +290,21 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
userIDInt, err := strconv.ParseUint(userID, 10, 64)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
|
||||
ok, err := h.deps.Limiter().CanUpload(userIDInt, len([]byte(req.Content)))
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
} else if !ok {
|
||||
c.JSON(q.ErrResp(c, 503, errors.New("retry later")))
|
||||
return
|
||||
}
|
||||
|
||||
tmpFilePath := q.UploadPath(userID, req.Path)
|
||||
locker := h.NewAutoLocker(c, lockName(tmpFilePath))
|
||||
locker.Exec(func() {
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"path/filepath"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"github.com/ihexxa/quickshare/src/cryptoutil"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -137,3 +139,22 @@ func UploadPath(userID, relFilePath string) string {
|
|||
func UploadFolder(userID string) string {
|
||||
return filepath.Join(userID, UploadDir)
|
||||
}
|
||||
|
||||
func GetUserInfo(tokenStr string, tokenEncDec cryptoutil.ITokenEncDec) (map[string]string, error) {
|
||||
claims, err := tokenEncDec.FromToken(
|
||||
tokenStr,
|
||||
map[string]string{
|
||||
UserIDParam: "",
|
||||
UserParam: "",
|
||||
RoleParam: "",
|
||||
ExpireParam: "",
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if claims[UserIDParam] == "" || claims[UserParam] == "" {
|
||||
return nil, errors.New("empty user id or name")
|
||||
}
|
||||
|
||||
return claims, nil
|
||||
}
|
||||
|
|
65
src/iolimiter/iolimiter.go
Normal file
65
src/iolimiter/iolimiter.go
Normal file
|
@ -0,0 +1,65 @@
|
|||
package iolimiter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ihexxa/quickshare/src/golimiter"
|
||||
"github.com/ihexxa/quickshare/src/userstore"
|
||||
)
|
||||
|
||||
const cacheSizeLimit = 1024
|
||||
|
||||
type ILimiter interface {
|
||||
CanUpload(id uint64, chunkSize int) (bool, error)
|
||||
}
|
||||
|
||||
type IOLimiter struct {
|
||||
mtx *sync.Mutex
|
||||
UploadLimiter *golimiter.Limiter
|
||||
users userstore.IUserStore
|
||||
quotaCache map[uint64]*userstore.Quota
|
||||
}
|
||||
|
||||
func NewIOLimiter(upCap, upCyc int, users userstore.IUserStore) *IOLimiter {
|
||||
return &IOLimiter{
|
||||
mtx: &sync.Mutex{},
|
||||
UploadLimiter: golimiter.New(upCap, upCyc),
|
||||
users: users,
|
||||
quotaCache: map[uint64]*userstore.Quota{},
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *IOLimiter) CanUpload(id uint64, chunkSize int) (bool, error) {
|
||||
lm.mtx.Lock()
|
||||
defer lm.mtx.Unlock()
|
||||
|
||||
quota, ok := lm.quotaCache[id]
|
||||
if !ok {
|
||||
user, err := lm.users.GetUser(id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
quota = user.Quota
|
||||
lm.quotaCache[id] = quota
|
||||
}
|
||||
if len(lm.quotaCache) > cacheSizeLimit {
|
||||
lm.clean()
|
||||
}
|
||||
|
||||
return lm.UploadLimiter.Access(
|
||||
fmt.Sprint(id),
|
||||
quota.UploadSpeedLimit,
|
||||
chunkSize,
|
||||
), nil
|
||||
}
|
||||
|
||||
func (lm *IOLimiter) clean() {
|
||||
count := 0
|
||||
for key := range lm.quotaCache {
|
||||
delete(lm.quotaCache, key)
|
||||
if count++; count > 5 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/ihexxa/quickshare/src/handlers/multiusers"
|
||||
"github.com/ihexxa/quickshare/src/handlers/settings"
|
||||
"github.com/ihexxa/quickshare/src/idgen/simpleidgen"
|
||||
"github.com/ihexxa/quickshare/src/iolimiter"
|
||||
"github.com/ihexxa/quickshare/src/kvstore"
|
||||
"github.com/ihexxa/quickshare/src/kvstore/boltdbpvd"
|
||||
"github.com/ihexxa/quickshare/src/userstore"
|
||||
|
@ -104,6 +105,10 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps {
|
|||
panic(fmt.Sprintf("fail to init user store: %s", err))
|
||||
}
|
||||
|
||||
limiterCap := cfg.IntOr("Users.LimiterCapacity", 4096)
|
||||
limiterCyc := cfg.IntOr("Users.LimiterCyc", 3000)
|
||||
limiter := iolimiter.NewIOLimiter(limiterCap, limiterCyc, users)
|
||||
|
||||
deps := depidx.NewDeps(cfg)
|
||||
deps.SetFS(filesystem)
|
||||
deps.SetToken(jwtEncDec)
|
||||
|
@ -111,6 +116,7 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps {
|
|||
deps.SetUsers(users)
|
||||
deps.SetID(ider)
|
||||
deps.SetLog(logger)
|
||||
deps.SetLimiter(limiter)
|
||||
|
||||
return deps
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue