diff --git a/src/client/web/src/worker/chunk_uploader.ts b/src/client/web/src/worker/chunk_uploader.ts index eb7beb4..6669d82 100644 --- a/src/client/web/src/worker/chunk_uploader.ts +++ b/src/client/web/src/worker/chunk_uploader.ts @@ -8,7 +8,7 @@ const defaultChunkLen = 1024 * 1024 * 1; const speedDownRatio = 0.5; const speedUpRatio = 1.1; const speedLimit = 1024 * 1024 * 10; // 10MB -const createRetryLimit = 2; +const createRetryLimit = 3; const uploadRetryLimit = 1024; const backoffMax = 2000; @@ -143,7 +143,7 @@ export class ChunkUploader { filePath, uploaded, state: UploadState.Error, - err: `failed to upload chunk: ${uploadResp.statusText}`, + err: `failed to upload chunk: ${uploadResp.statusText}, ${JSON.stringify(uploadResp.data)}`, }; } @@ -156,7 +156,7 @@ export class ChunkUploader { filePath, uploaded: uploadStatusResp.data.uploaded, state: UploadState.Ready, - err: "", + err: `retrying, error: ${JSON.stringify(uploadResp.data)}`, } : { filePath, diff --git a/src/fs/fs_interface.go b/src/fs/fs_interface.go index 41754d9..a981c4a 100644 --- a/src/fs/fs_interface.go +++ b/src/fs/fs_interface.go @@ -23,6 +23,7 @@ type ISimpleFS interface { Close() error Sync() error GetFileReader(path string) (ReadCloseSeeker, error) + CloseReader(path string) error Root() string ListDir(path string) ([]os.FileInfo, error) } diff --git a/src/fs/local/fs.go b/src/fs/local/fs.go index 783d231..0d57b8f 100644 --- a/src/fs/local/fs.go +++ b/src/fs/local/fs.go @@ -47,7 +47,7 @@ func NewLocalFS(root string, defaultPerm uint32, opensLimit, openTTL int) *Local opensLimit: opensLimit, openTTL: time.Duration(openTTL) * time.Second, opensMtx: &sync.RWMutex{}, - opensCleanSize: 10, + opensCleanSize: 3, readersMtx: &sync.RWMutex{}, readers: map[string]*fileInfo{}, // TODO: track readers and close idles } @@ -57,11 +57,17 @@ func (fs *LocalFS) Root() string { return fs.root } +// should be protected by opensMtx +func (fs *LocalFS) isTooManyOpens() bool { + return len(fs.opens)+len(fs.readers) > fs.opensLimit +} + // closeOpens assumes that it is called after opensMtx.Lock() -func (fs *LocalFS) closeOpens(closeAll bool, exclude map[string]bool) error { +func (fs *LocalFS) closeOpens(closeAll, forced bool, exclude map[string]bool) (int, error) { batch := fs.opensCleanSize var err error + closed := 0 for key, info := range fs.opens { if exclude[key] { continue @@ -72,24 +78,26 @@ func (fs *LocalFS) closeOpens(closeAll bool, exclude map[string]bool) error { } batch-- - if info.lastAccess.Add(fs.openTTL).Before(time.Now()) { - delete(fs.opens, key) + if forced || info.lastAccess.Add(fs.openTTL).Before(time.Now()) { if err = info.fd.Sync(); err != nil { - return err + return 0, err } if err := info.fd.Close(); err != nil { - return err + return 0, err } + delete(fs.opens, key) + closed++ } } - return nil + return closed, nil } func (fs *LocalFS) Sync() error { fs.opensMtx.Lock() defer fs.opensMtx.Unlock() - return fs.closeOpens(true, map[string]bool{}) + _, err := fs.closeOpens(true, true, map[string]bool{}) + return err } // check refers implementation of Dir.Open() in http package @@ -103,12 +111,12 @@ func (fs *LocalFS) translate(name string) (string, error) { func (fs *LocalFS) Create(path string) error { fs.opensMtx.Lock() defer fs.opensMtx.Unlock() - if len(fs.opens) > fs.opensLimit { - err := fs.closeOpens(true, map[string]bool{}) + + if fs.isTooManyOpens() { + _, err := fs.closeOpens(false, true, map[string]bool{}) if err != nil { return fmt.Errorf("too many opens and fail to clean: %w", err) } - return ErrTooManyOpens } fullpath, err := fs.translate(path) @@ -149,12 +157,19 @@ func (fs *LocalFS) Rename(oldpath, newpath string) error { if err != nil { return err } - _, err = os.Stat(fullOldPath) + fullNewPath, err := fs.translate(newpath) if err != nil { return err } - fullNewPath, err := fs.translate(newpath) + if fs.isTooManyOpens() { + _, err := fs.closeOpens(false, true, map[string]bool{}) + if err != nil { + return fmt.Errorf("too many opens and fail to clean: %w", err) + } + } + + _, err = os.Stat(fullOldPath) if err != nil { return err } @@ -182,8 +197,11 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) { info, ok := fs.opens[fullpath] if !ok { - if len(fs.opens) > fs.opensLimit { - return nil, ErrTooManyOpens + if fs.isTooManyOpens() { + _, err := fs.closeOpens(false, true, map[string]bool{}) + if err != nil { + return nil, fmt.Errorf("too many opens and fail to clean: %w", err) + } } // because the fd may be for other usage, its flag is not set as os.O_RDONLY @@ -196,7 +214,7 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) { lastAccess: time.Now(), } fs.opens[fullpath] = info - fs.closeOpens(false, map[string]bool{fullpath: true}) + // fs.closeOpens(false, true, map[string]bool{fullpath: true}) } return info, nil @@ -228,8 +246,11 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) { info, ok := fs.opens[fullpath] if !ok { - if len(fs.opens) > fs.opensLimit { - return nil, ErrTooManyOpens + if fs.isTooManyOpens() { + _, err := fs.closeOpens(false, true, map[string]bool{}) + if err != nil { + return nil, fmt.Errorf("too many opens and fail to clean: %w", err) + } } // it does NOT create file for writing @@ -242,7 +263,6 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) { lastAccess: time.Now(), } fs.opens[fullpath] = info - fs.closeOpens(false, map[string]bool{fullpath: true}) } return info, nil @@ -304,6 +324,10 @@ func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, error) { return nil, err } + if fs.isTooManyOpens() { + return nil, ErrTooManyOpens + } + fd, err := os.OpenFile(fullpath, os.O_RDONLY, fs.defaultPerm) if err != nil { return nil, err @@ -319,6 +343,28 @@ func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, error) { return fd, nil } +func (fs *LocalFS) CloseReader(path string) error { + fullpath, err := fs.translate(path) + if err != nil { + return err + } + + fs.readersMtx.Lock() + defer fs.readersMtx.Unlock() + info, ok := fs.readers[fullpath] + if !ok { + return fmt.Errorf("reader not found: %s", path) + } + + err = info.fd.Close() + if err != nil { + return fmt.Errorf("close reader failed: %w", err) + } + + delete(fs.readers, fullpath) + return nil +} + func (fs *LocalFS) ListDir(path string) ([]os.FileInfo, error) { fullpath, err := fs.translate(path) if err != nil { diff --git a/src/handlers/fileshdr/async_handlers.go b/src/handlers/fileshdr/async_handlers.go index b28906c..453cd10 100644 --- a/src/handlers/fileshdr/async_handlers.go +++ b/src/handlers/fileshdr/async_handlers.go @@ -8,6 +8,7 @@ import ( "github.com/ihexxa/quickshare/src/worker" ) + const MsgTypeSha1 = "sha1" type Sha1Params struct { @@ -25,6 +26,12 @@ func (h *FileHandlers) genSha1(msg worker.IMsg) error { if err != nil { return fmt.Errorf("fail to get reader: %s", err) } + defer func() { + err := h.deps.FS().CloseReader(taskInputs.FilePath) + if err != nil { + h.deps.Log().Errorf("failed to close file: %s", err) + } + }() hasher := sha1.New() buf := make([]byte, 4096) diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 9e83e16..16282bb 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -214,22 +214,17 @@ func (h *FileHandlers) Delete(c *gin.Context) { return } - err = h.deps.Users().SetUsed(userIDInt, false, info.Size()) - if err != nil { - if userstore.IsReachedLimitErr(err) { - c.JSON(q.ErrResp(c, 429, err)) - } else { - c.JSON(q.ErrResp(c, 500, err)) - } - return - } - err = h.deps.FS().Remove(filePath) if err != nil { c.JSON(q.ErrResp(c, 500, err)) return } + err = h.deps.Users().SetUsed(userIDInt, false, info.Size()) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } c.JSON(q.Resp(200)) }) } @@ -578,6 +573,12 @@ func (h *FileHandlers) Download(c *gin.Context) { c.JSON(q.ErrResp(c, 500, err)) return } + defer func() { + err := h.deps.FS().CloseReader(filePath) + if err != nil { + h.deps.Log().Errorf("failed to close: %s", err) + } + }() extraHeaders := map[string]string{ "Content-Disposition": fmt.Sprintf(`attachment; filename="%s"`, info.Name()), @@ -764,17 +765,13 @@ func (h *FileHandlers) DelUploading(c *gin.Context) { return } - err = h.deps.Users().SetUsed(userIDInt, false, size) + err = h.deps.FS().Remove(tmpFilePath) if err != nil { - if userstore.IsReachedLimitErr(err) { - c.JSON(q.ErrResp(c, 429, err)) - } else { - c.JSON(q.ErrResp(c, 500, err)) - } + c.JSON(q.ErrResp(c, 500, err)) return } - err = h.deps.FS().Remove(tmpFilePath) + err = h.deps.Users().SetUsed(userIDInt, false, size) if err != nil { c.JSON(q.ErrResp(c, 500, err)) return diff --git a/src/server/test_helpers.go b/src/server/test_helpers.go index f44dfcc..0102e84 100644 --- a/src/server/test_helpers.go +++ b/src/server/test_helpers.go @@ -62,6 +62,12 @@ func compareFileContent(fs fspkg.ISimpleFS, uid, filePath string, expectedConten if err != nil { return false, err } + defer func() { + err = fs.CloseReader(filePath) + if err != nil { + fmt.Println(err) + } + }() gotContent, err := ioutil.ReadAll(reader) if err != nil {