feat(files): add uploadings api (#30)
* fix(uploader, files/handlers): fix incorrect unlock, catch and check after calling api * fix(uploader): fix uploader test * feat(files): add uploadings api * fix(files): register uploading handlers to api Co-authored-by: Jia He <jiah@nvidia.com>
This commit is contained in:
parent
a909df384d
commit
67c07cc81f
11 changed files with 398 additions and 83 deletions
|
@ -18,6 +18,7 @@ import (
|
|||
|
||||
"github.com/ihexxa/quickshare/src/depidx"
|
||||
q "github.com/ihexxa/quickshare/src/handlers"
|
||||
"github.com/ihexxa/quickshare/src/handlers/singleuserhdr"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -93,6 +94,7 @@ func (lk *AutoLocker) Exec(handler func()) {
|
|||
lk.c.JSON(q.ErrResp(lk.c, 500, errors.New("fail to lock the file")))
|
||||
return
|
||||
}
|
||||
locked = true
|
||||
|
||||
locked = true
|
||||
handler()
|
||||
|
@ -109,16 +111,17 @@ func (h *FileHandlers) Create(c *gin.Context) {
|
|||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
userName := c.MustGet(singleuserhdr.UserParam).(string)
|
||||
|
||||
tmpFilePath := h.GetTmpPath(req.Path)
|
||||
locker := h.NewAutoLocker(c, tmpFilePath)
|
||||
tmpFilePath := getTmpPath(req.Path)
|
||||
locker := h.NewAutoLocker(c, lockName(userName, tmpFilePath))
|
||||
locker.Exec(func() {
|
||||
err := h.deps.FS().Create(tmpFilePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
err = h.uploadMgr.AddInfo(req.Path, tmpFilePath, req.FileSize, false)
|
||||
err = h.uploadMgr.AddInfo(userName, req.Path, tmpFilePath, req.FileSize)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
|
@ -252,13 +255,14 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
|
|||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
userName := c.MustGet(singleuserhdr.UserParam).(string)
|
||||
|
||||
tmpFilePath := h.GetTmpPath(req.Path)
|
||||
locker := h.NewAutoLocker(c, tmpFilePath)
|
||||
tmpFilePath := getTmpPath(req.Path)
|
||||
locker := h.NewAutoLocker(c, lockName(userName, tmpFilePath))
|
||||
locker.Exec(func() {
|
||||
var err error
|
||||
|
||||
_, fileSize, uploaded, err := h.uploadMgr.GetInfo(tmpFilePath)
|
||||
_, fileSize, uploaded, err := h.uploadMgr.GetInfo(userName, tmpFilePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
|
@ -273,15 +277,13 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
fmt.Println("length", len([]byte(content)))
|
||||
|
||||
wrote, err := h.deps.FS().WriteAt(tmpFilePath, []byte(content), req.Offset)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
|
||||
err = h.uploadMgr.SetUploaded(tmpFilePath, req.Offset+int64(wrote))
|
||||
err = h.uploadMgr.SetInfo(userName, tmpFilePath, req.Offset+int64(wrote))
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
|
@ -295,7 +297,7 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
|
|||
c.JSON(q.ErrResp(c, 500, fmt.Errorf("%s error: %w", req.Path, err)))
|
||||
return
|
||||
}
|
||||
err = h.uploadMgr.DelInfo(tmpFilePath)
|
||||
err = h.uploadMgr.DelInfo(userName, tmpFilePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
|
@ -323,11 +325,12 @@ func (h *FileHandlers) UploadStatus(c *gin.Context) {
|
|||
if filePath == "" {
|
||||
c.JSON(q.ErrResp(c, 400, errors.New("invalid file name")))
|
||||
}
|
||||
userName := c.MustGet(singleuserhdr.UserParam).(string)
|
||||
|
||||
tmpFilePath := h.GetTmpPath(filePath)
|
||||
locker := h.NewAutoLocker(c, tmpFilePath)
|
||||
tmpFilePath := getTmpPath(filePath)
|
||||
locker := h.NewAutoLocker(c, lockName(userName, tmpFilePath))
|
||||
locker.Exec(func() {
|
||||
_, fileSize, uploaded, err := h.uploadMgr.GetInfo(tmpFilePath)
|
||||
_, fileSize, uploaded, err := h.uploadMgr.GetInfo(userName, tmpFilePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
c.JSON(q.ErrResp(c, 404, err))
|
||||
|
@ -347,7 +350,6 @@ func (h *FileHandlers) UploadStatus(c *gin.Context) {
|
|||
}
|
||||
|
||||
// TODO: support ETag
|
||||
// TODO: use correct content type
|
||||
func (h *FileHandlers) Download(c *gin.Context) {
|
||||
rangeVal := c.GetHeader(rangeHeader)
|
||||
ifRangeVal := c.GetHeader(ifRangeHeader)
|
||||
|
@ -391,7 +393,6 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
|||
// := r.(*os.File)
|
||||
|
||||
// respond to normal requests
|
||||
fmt.Println(ifRangeVal, rangeVal)
|
||||
if ifRangeVal != "" || rangeVal == "" {
|
||||
c.DataFromReader(200, info.Size(), contentType, r, map[string]string{})
|
||||
return
|
||||
|
@ -463,10 +464,56 @@ func (h *FileHandlers) CopyDir(c *gin.Context) {
|
|||
c.JSON(q.NewMsgResp(501, "Not Implemented"))
|
||||
}
|
||||
|
||||
func (h *FileHandlers) GetTmpPath(filePath string) string {
|
||||
func getTmpPath(filePath string) string {
|
||||
return path.Join(UploadDir, fmt.Sprintf("%x", sha1.Sum([]byte(filePath))))
|
||||
}
|
||||
|
||||
func lockName(user, filePath string) string {
|
||||
return fmt.Sprintf("%s/%s", user, filePath)
|
||||
}
|
||||
|
||||
func (h *FileHandlers) FsPath(filePath string) string {
|
||||
return path.Join(FsDir, filePath)
|
||||
}
|
||||
|
||||
type ListUploadingsResp struct {
|
||||
UploadInfos []*UploadInfo `json:"uploadInfos"`
|
||||
}
|
||||
|
||||
func (h *FileHandlers) ListUploadings(c *gin.Context) {
|
||||
userName := c.MustGet(singleuserhdr.UserParam).(string)
|
||||
|
||||
infos, err := h.uploadMgr.ListInfo(userName)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
c.JSON(200, &ListUploadingsResp{UploadInfos: infos})
|
||||
}
|
||||
|
||||
func (h *FileHandlers) DelUploading(c *gin.Context) {
|
||||
filePath := c.Query(FilePathQuery)
|
||||
if filePath == "" {
|
||||
c.JSON(q.ErrResp(c, 400, errors.New("invalid file path")))
|
||||
return
|
||||
}
|
||||
userName := c.MustGet(singleuserhdr.UserParam).(string)
|
||||
|
||||
var err error
|
||||
tmpFilePath := getTmpPath(filePath)
|
||||
locker := h.NewAutoLocker(c, lockName(userName, tmpFilePath))
|
||||
locker.Exec(func() {
|
||||
err = h.deps.FS().Remove(tmpFilePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
|
||||
err = h.uploadMgr.DelInfo(userName, tmpFilePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
return
|
||||
}
|
||||
c.JSON(q.Resp(200))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,81 +1,121 @@
|
|||
package fileshdr
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/ihexxa/quickshare/src/kvstore"
|
||||
)
|
||||
|
||||
var (
|
||||
isDirKey = "isDir"
|
||||
fileSizeKey = "fileSize"
|
||||
uploadedKey = "uploaded"
|
||||
filePathKey = "fileName"
|
||||
ErrCreateExisting = errors.New("create upload info which already exists")
|
||||
ErrGreaterThanSize = errors.New("uploaded is greater than file size")
|
||||
ErrNotFound = errors.New("upload info not found")
|
||||
|
||||
uploadsPrefix = "uploads"
|
||||
)
|
||||
|
||||
type UploadInfo struct {
|
||||
RealFilePath string `json:"realFilePath"`
|
||||
Size int64 `json:"size"`
|
||||
Uploaded int64 `json:"uploaded"`
|
||||
}
|
||||
|
||||
type UploadMgr struct {
|
||||
kv kvstore.IKVStore
|
||||
}
|
||||
|
||||
func UploadNS(user string) string {
|
||||
return fmt.Sprintf("%s/%s", uploadsPrefix, user)
|
||||
}
|
||||
|
||||
func NewUploadMgr(kv kvstore.IKVStore) *UploadMgr {
|
||||
return &UploadMgr{
|
||||
kv: kv,
|
||||
}
|
||||
}
|
||||
|
||||
func (um *UploadMgr) AddInfo(fileName, tmpName string, fileSize int64, isDir bool) error {
|
||||
err := um.kv.SetInt64(infoKey(tmpName, fileSizeKey), fileSize)
|
||||
func (um *UploadMgr) AddInfo(user, filePath, tmpPath string, fileSize int64) error {
|
||||
ns := UploadNS(user)
|
||||
err := um.kv.AddNamespace(ns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = um.kv.SetInt64(infoKey(tmpName, uploadedKey), 0)
|
||||
|
||||
_, ok := um.kv.GetStringIn(ns, tmpPath)
|
||||
if ok {
|
||||
return ErrCreateExisting
|
||||
}
|
||||
|
||||
info := &UploadInfo{
|
||||
RealFilePath: filePath,
|
||||
Size: fileSize,
|
||||
Uploaded: 0,
|
||||
}
|
||||
infoBytes, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return um.kv.SetString(infoKey(tmpName, filePathKey), fileName)
|
||||
|
||||
return um.kv.SetStringIn(ns, tmpPath, string(infoBytes))
|
||||
}
|
||||
|
||||
func (um *UploadMgr) SetUploaded(fileName string, newUploaded int64) error {
|
||||
fileSize, ok := um.kv.GetInt64(infoKey(fileName, fileSizeKey))
|
||||
if !ok {
|
||||
return fmt.Errorf("file size %s not found", fileName)
|
||||
}
|
||||
if newUploaded <= fileSize {
|
||||
um.kv.SetInt64(infoKey(fileName, uploadedKey), newUploaded)
|
||||
return nil
|
||||
}
|
||||
return errors.New("uploaded is greater than file size")
|
||||
}
|
||||
|
||||
func (um *UploadMgr) GetInfo(fileName string) (string, int64, int64, error) {
|
||||
realFilePath, ok := um.kv.GetString(infoKey(fileName, filePathKey))
|
||||
if !ok {
|
||||
return "", 0, 0, os.ErrNotExist
|
||||
}
|
||||
fileSize, ok := um.kv.GetInt64(infoKey(fileName, fileSizeKey))
|
||||
if !ok {
|
||||
return "", 0, 0, os.ErrNotExist
|
||||
}
|
||||
uploaded, ok := um.kv.GetInt64(infoKey(fileName, uploadedKey))
|
||||
if !ok {
|
||||
return "", 0, 0, os.ErrNotExist
|
||||
func (um *UploadMgr) SetInfo(user, filePath string, newUploaded int64) error {
|
||||
realFilePath, fileSize, _, err := um.GetInfo(user, filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if newUploaded > fileSize {
|
||||
return ErrGreaterThanSize
|
||||
}
|
||||
|
||||
return realFilePath, fileSize, uploaded, nil
|
||||
}
|
||||
|
||||
func (um *UploadMgr) DelInfo(fileName string) error {
|
||||
if err := um.kv.DelInt64(infoKey(fileName, fileSizeKey)); err != nil {
|
||||
newInfo := &UploadInfo{
|
||||
RealFilePath: realFilePath,
|
||||
Size: fileSize,
|
||||
Uploaded: newUploaded,
|
||||
}
|
||||
newInfoBytes, err := json.Marshal(newInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := um.kv.DelInt64(infoKey(fileName, uploadedKey)); err != nil {
|
||||
return err
|
||||
}
|
||||
return um.kv.DelString(infoKey(fileName, filePathKey))
|
||||
return um.kv.SetStringIn(UploadNS(user), filePath, string(newInfoBytes))
|
||||
}
|
||||
|
||||
func infoKey(fileName, key string) string {
|
||||
return fmt.Sprintf("%s:%s", fileName, key)
|
||||
func (um *UploadMgr) GetInfo(user, filePath string) (string, int64, int64, error) {
|
||||
ns := UploadNS(user)
|
||||
infoBytes, ok := um.kv.GetStringIn(ns, filePath)
|
||||
if !ok {
|
||||
return "", 0, 0, ErrNotFound
|
||||
}
|
||||
|
||||
info := &UploadInfo{}
|
||||
err := json.Unmarshal([]byte(infoBytes), info)
|
||||
if err != nil {
|
||||
return "", 0, 0, err
|
||||
}
|
||||
|
||||
return info.RealFilePath, info.Size, info.Uploaded, nil
|
||||
}
|
||||
|
||||
func (um *UploadMgr) DelInfo(user, filePath string) error {
|
||||
return um.kv.DelInt64In(UploadNS(user), filePath)
|
||||
}
|
||||
|
||||
func (um *UploadMgr) ListInfo(user string) ([]*UploadInfo, error) {
|
||||
infoMap, err := um.kv.ListStringsIn(UploadNS(user))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
infos := []*UploadInfo{}
|
||||
for _, infoStr := range infoMap {
|
||||
info := &UploadInfo{}
|
||||
err = json.Unmarshal([]byte(infoStr), info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
infos = append(infos, info)
|
||||
}
|
||||
|
||||
return infos, nil
|
||||
}
|
||||
|
|
|
@ -53,11 +53,14 @@ func (h *SimpleUserHandlers) Auth() gin.HandlerFunc {
|
|||
ExpireParam: "",
|
||||
}
|
||||
|
||||
_, err = h.deps.Token().FromToken(token, claims)
|
||||
claims, err = h.deps.Token().FromToken(token, claims)
|
||||
if err != nil {
|
||||
c.AbortWithStatusJSON(q.ErrResp(c, 401, err))
|
||||
return
|
||||
}
|
||||
for key, val := range claims {
|
||||
c.Set(key, val)
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
expire, err := strconv.ParseInt(claims[ExpireParam], 10, 64)
|
||||
|
@ -71,6 +74,9 @@ func (h *SimpleUserHandlers) Auth() gin.HandlerFunc {
|
|||
c.AbortWithStatusJSON(q.ErrResp(c, 401, errors.New("not allowed")))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// this is for UploadMgr to get user info to get related namespace
|
||||
c.Set(UserParam, "quickshare_anonymous")
|
||||
}
|
||||
|
||||
c.Next()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue