diff --git a/src/fs/fs_interface.go b/src/fs/fs_interface.go index a981c4a..9bf8c86 100644 --- a/src/fs/fs_interface.go +++ b/src/fs/fs_interface.go @@ -22,8 +22,8 @@ type ISimpleFS interface { Stat(path string) (os.FileInfo, error) Close() error Sync() error - GetFileReader(path string) (ReadCloseSeeker, error) - CloseReader(path string) error + GetFileReader(path string) (ReadCloseSeeker, uint64, error) + CloseReader(id string) error Root() string ListDir(path string) ([]os.FileInfo, error) } diff --git a/src/fs/local/fs.go b/src/fs/local/fs.go index 0d57b8f..b7294a3 100644 --- a/src/fs/local/fs.go +++ b/src/fs/local/fs.go @@ -12,6 +12,7 @@ import ( "time" "github.com/ihexxa/quickshare/src/fs" + "github.com/ihexxa/quickshare/src/idgen" ) var ErrTooManyOpens = errors.New("too many opened files") @@ -25,8 +26,9 @@ type LocalFS struct { opensMtx *sync.RWMutex opensCleanSize int openTTL time.Duration - readersMtx *sync.RWMutex + readerTTL time.Duration readers map[string]*fileInfo + ider idgen.IIDGen } type fileInfo struct { @@ -34,21 +36,22 @@ type fileInfo struct { fd *os.File } -func NewLocalFS(root string, defaultPerm uint32, opensLimit, openTTL int) *LocalFS { +func NewLocalFS(root string, defaultPerm uint32, opensLimit, openTTL, readerTTL int, ider idgen.IIDGen) *LocalFS { if root == "" { root = "." } return &LocalFS{ + ider: ider, root: root, defaultPerm: os.FileMode(defaultPerm), defaultDirPerm: os.FileMode(0775), opens: map[string]*fileInfo{}, opensLimit: opensLimit, openTTL: time.Duration(openTTL) * time.Second, + readerTTL: time.Duration(readerTTL) * time.Second, opensMtx: &sync.RWMutex{}, opensCleanSize: 3, - readersMtx: &sync.RWMutex{}, readers: map[string]*fileInfo{}, // TODO: track readers and close idles } } @@ -63,29 +66,49 @@ func (fs *LocalFS) isTooManyOpens() bool { } // closeOpens assumes that it is called after opensMtx.Lock() -func (fs *LocalFS) closeOpens(closeAll, forced bool, exclude map[string]bool) (int, error) { +func (fs *LocalFS) closeOpens(iterateAll, forced bool, exclude map[string]bool) (int, error) { batch := fs.opensCleanSize var err error closed := 0 - for key, info := range fs.opens { - if exclude[key] { + for filePath, info := range fs.opens { + if !iterateAll && exclude[filePath] { continue } - if !closeAll && batch <= 0 { + if !iterateAll && batch <= 0 { break } batch-- if forced || info.lastAccess.Add(fs.openTTL).Before(time.Now()) { + if err = fs.closeInfo(filePath, info); err != nil { + return closed, err + } + closed++ + } + } + + batch = fs.opensCleanSize + for id, info := range fs.readers { + if !iterateAll && exclude[id] { + continue + } + + if !iterateAll && batch <= 0 { + break + } + batch-- + + if forced || info.lastAccess.Add(fs.readerTTL).Before(time.Now()) { + var err error if err = info.fd.Sync(); err != nil { - return 0, err + return closed, err } if err := info.fd.Close(); err != nil { - return 0, err + return closed, err } - delete(fs.opens, key) + delete(fs.readers, id) closed++ } } @@ -93,13 +116,30 @@ func (fs *LocalFS) closeOpens(closeAll, forced bool, exclude map[string]bool) (i return closed, nil } +func (fs *LocalFS) closeInfo(key string, info *fileInfo) error { + var err error + if err = info.fd.Sync(); err != nil { + return err + } + if err := info.fd.Close(); err != nil { + return err + } + delete(fs.opens, key) + return nil +} + func (fs *LocalFS) Sync() error { fs.opensMtx.Lock() defer fs.opensMtx.Unlock() + _, err := fs.closeOpens(true, true, map[string]bool{}) return err } +func (fs *LocalFS) Close() error { + return fs.Sync() +} + // check refers implementation of Dir.Open() in http package func (fs *LocalFS) translate(name string) (string, error) { if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) { @@ -113,9 +153,9 @@ func (fs *LocalFS) Create(path string) error { defer fs.opensMtx.Unlock() if fs.isTooManyOpens() { - _, err := fs.closeOpens(false, true, map[string]bool{}) - if err != nil { - return fmt.Errorf("too many opens and fail to clean: %w", err) + closed, err := fs.closeOpens(false, false, map[string]bool{}) + if err != nil || closed == 0 { + return fmt.Errorf("too many opens and fail to clean(%d): %w", closed, err) } } @@ -124,11 +164,15 @@ func (fs *LocalFS) Create(path string) error { return err } + _, ok := fs.opens[fullpath] + if ok { + return os.ErrExist + } + fd, err := os.OpenFile(fullpath, os.O_CREATE|os.O_RDWR|os.O_EXCL, fs.defaultPerm) if err != nil { return err } - fs.opens[fullpath] = &fileInfo{ lastAccess: time.Now(), fd: fd, @@ -153,6 +197,8 @@ func (fs *LocalFS) Remove(entryPath string) error { } func (fs *LocalFS) Rename(oldpath, newpath string) error { + // TODO: if the rename will be implemented without Rename + // we must check if the files are in reading/writing fullOldPath, err := fs.translate(oldpath) if err != nil { return err @@ -161,17 +207,8 @@ func (fs *LocalFS) Rename(oldpath, newpath string) error { if err != nil { return err } - - if fs.isTooManyOpens() { - _, err := fs.closeOpens(false, true, map[string]bool{}) - if err != nil { - return fmt.Errorf("too many opens and fail to clean: %w", err) - } - } - - _, err = os.Stat(fullOldPath) - if err != nil { - return err + if fullOldPath == fullNewPath { + return nil } // avoid replacing existing file/folder @@ -198,9 +235,9 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) { info, ok := fs.opens[fullpath] if !ok { if fs.isTooManyOpens() { - _, err := fs.closeOpens(false, true, map[string]bool{}) - if err != nil { - return nil, fmt.Errorf("too many opens and fail to clean: %w", err) + closed, err := fs.closeOpens(false, false, map[string]bool{}) + if err != nil || closed == 0 { + return nil, fmt.Errorf("too many opens and fail to clean (%d): %w", closed, err) } } @@ -214,7 +251,8 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) { lastAccess: time.Now(), } fs.opens[fullpath] = info - // fs.closeOpens(false, true, map[string]bool{fullpath: true}) + } else { + info.lastAccess = time.Now() } return info, nil @@ -247,9 +285,10 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) { info, ok := fs.opens[fullpath] if !ok { if fs.isTooManyOpens() { - _, err := fs.closeOpens(false, true, map[string]bool{}) - if err != nil { - return nil, fmt.Errorf("too many opens and fail to clean: %w", err) + closed, err := fs.closeOpens(false, false, map[string]bool{}) + if err != nil || closed == 0 { + // TODO: return Eagain and make client retry later + return nil, fmt.Errorf("too many opens and fail to clean (%d): %w", closed, err) } } @@ -263,6 +302,8 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) { lastAccess: time.Now(), } fs.opens[fullpath] = info + } else { + info.lastAccess = time.Now() } return info, nil @@ -297,71 +338,53 @@ func (fs *LocalFS) Stat(path string) (os.FileInfo, error) { return os.Stat(fullpath) } -func (fs *LocalFS) Close() error { +// readers are not tracked by opens +func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, uint64, error) { + fullpath, err := fs.translate(path) + if err != nil { + return nil, 0, err + } + fs.opensMtx.Lock() defer fs.opensMtx.Unlock() - var err error - for filePath, info := range fs.opens { - err = info.fd.Sync() - if err != nil { - return err - } - err = info.fd.Close() - if err != nil { - return err - } - delete(fs.opens, filePath) - } - - return nil -} - -// readers are not tracked by opens -func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, error) { - fullpath, err := fs.translate(path) - if err != nil { - return nil, err - } - if fs.isTooManyOpens() { - return nil, ErrTooManyOpens + closed, err := fs.closeOpens(false, false, map[string]bool{}) + if err != nil || closed == 0 { + return nil, 0, fmt.Errorf("too many opens and fail to clean (%d): %w", closed, err) + } } fd, err := os.OpenFile(fullpath, os.O_RDONLY, fs.defaultPerm) if err != nil { - return nil, err + return nil, 0, err } - fs.readersMtx.Lock() - defer fs.readersMtx.Unlock() - - fs.readers[fullpath] = &fileInfo{ + id := fs.ider.Gen() + fs.readers[fmt.Sprint(id)] = &fileInfo{ fd: fd, lastAccess: time.Now(), } - return fd, nil + return fd, id, nil } -func (fs *LocalFS) CloseReader(path string) error { - fullpath, err := fs.translate(path) - if err != nil { +func (fs *LocalFS) CloseReader(id string) error { + fs.opensMtx.Lock() + defer fs.opensMtx.Unlock() + + info, ok := fs.readers[id] + if !ok { + return fmt.Errorf("reader not found: %s", id) + } + + var err error + if err = info.fd.Sync(); err != nil { return err } - - fs.readersMtx.Lock() - defer fs.readersMtx.Unlock() - info, ok := fs.readers[fullpath] - if !ok { - return fmt.Errorf("reader not found: %s", path) + if err := info.fd.Close(); err != nil { + return err } - - err = info.fd.Close() - if err != nil { - return fmt.Errorf("close reader failed: %w", err) - } - - delete(fs.readers, fullpath) + delete(fs.readers, id) return nil } diff --git a/src/handlers/fileshdr/async_handlers.go b/src/handlers/fileshdr/async_handlers.go index 453cd10..a000640 100644 --- a/src/handlers/fileshdr/async_handlers.go +++ b/src/handlers/fileshdr/async_handlers.go @@ -22,12 +22,12 @@ func (h *FileHandlers) genSha1(msg worker.IMsg) error { return fmt.Errorf("fail to unmarshal sha1 msg: %w", err) } - f, err := h.deps.FS().GetFileReader(taskInputs.FilePath) + f, id, err := h.deps.FS().GetFileReader(taskInputs.FilePath) if err != nil { return fmt.Errorf("fail to get reader: %s", err) } defer func() { - err := h.deps.FS().CloseReader(taskInputs.FilePath) + err := h.deps.FS().CloseReader(fmt.Sprint(id)) if err != nil { h.deps.Log().Errorf("failed to close file: %s", err) } diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 16282bb..7479254 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -568,13 +568,13 @@ func (h *FileHandlers) Download(c *gin.Context) { } contentType := http.DetectContentType(fileHeadBuf[:read]) - fd, err := h.deps.FS().GetFileReader(filePath) + fd, id, err := h.deps.FS().GetFileReader(filePath) if err != nil { c.JSON(q.ErrResp(c, 500, err)) return } defer func() { - err := h.deps.FS().CloseReader(filePath) + err := h.deps.FS().CloseReader(fmt.Sprint(id)) if err != nil { h.deps.Log().Errorf("failed to close: %s", err) } diff --git a/src/server/server.go b/src/server/server.go index 9aeef20..67c98f5 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -110,9 +110,10 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps { mkRoot(rootPath) opensLimit := cfg.GrabInt("Fs.OpensLimit") openTTL := cfg.GrabInt("Fs.OpenTTL") + readerTTL := cfg.GrabInt("Server.WriteTimeout") / 1000 // millisecond -> second ider := simpleidgen.New() - filesystem := local.NewLocalFS(rootPath, 0660, opensLimit, openTTL) + filesystem := local.NewLocalFS(rootPath, 0660, opensLimit, openTTL, readerTTL, ider) jwtEncDec := jwt.NewJWTEncDec(secret) kv := boltdbpvd.New(rootPath, 1024) users, err := userstore.NewKVUserStore(kv) @@ -313,6 +314,7 @@ func (s *Server) Start() error { func (s *Server) Shutdown() error { // TODO: add timeout s.deps.Workers().Stop() + s.deps.FS().Close() s.deps.Log().Sync() return s.server.Shutdown(context.Background()) } diff --git a/src/server/server_files_test.go b/src/server/server_files_test.go index 49d32ed..269491e 100644 --- a/src/server/server_files_test.go +++ b/src/server/server_files_test.go @@ -575,7 +575,7 @@ func TestFileHandlers(t *testing.T) { // cl := client.NewFilesClient(addr) files := map[string]string{ - "qs/files/uploadings/path1/f1": "12345678", + "qs/files/uploadings1/path1/f1": "12345678", } for filePath, content := range files { @@ -594,18 +594,13 @@ func TestFileHandlers(t *testing.T) { offset := int64(0) for _, chunk := range chunks { base64Content := base64.StdEncoding.EncodeToString(chunk) - res, _, errs = cl.UploadChunk(filePath, base64Content, offset) + res, bodyStr, errs := cl.UploadChunk(filePath, base64Content, offset) offset += int64(len(chunk)) if len(errs) > 0 { t.Fatal(errs) } else if res.StatusCode != 200 { - t.Fatal(res.StatusCode) - } - - err = fs.Close() - if err != nil { - t.Fatal(err) + t.Fatal(fmt.Sprintln(res.StatusCode, bodyStr)) } } @@ -663,11 +658,6 @@ func TestFileHandlers(t *testing.T) { } else if res.StatusCode != 200 { t.Fatal(res.StatusCode) } - - err = fs.Close() - if err != nil { - t.Fatal(err) - } } err = fs.Sync() diff --git a/src/server/server_space_limit_test.go b/src/server/server_space_limit_test.go index 69ff29b..b78d068 100644 --- a/src/server/server_space_limit_test.go +++ b/src/server/server_space_limit_test.go @@ -89,7 +89,7 @@ func TestSpaceLimit(t *testing.T) { t.Fatal(resp.StatusCode) } - t.Run("test space limitiong: Upload", func(t *testing.T) { + t.Run("test space limiting: Upload", func(t *testing.T) { usersCl := client.NewSingleUserClient(addr) resp, _, errs := usersCl.Login(getUserName(0), userPwd) if len(errs) > 0 { @@ -105,9 +105,9 @@ func TestSpaceLimit(t *testing.T) { } for i := 0; i < 10; i++ { - ok := assertUploadOK(t, fmt.Sprintf("%s/spacelimit/f_%d", getUserName(0), 0), fileContent, addr, token) + ok := assertUploadOK(t, fmt.Sprintf("%s/files/spacelimit/f_%d", getUserName(0), i), fileContent, addr, token) if !ok { - t.Fatalf("space limit failed at %d", 0) + t.Fatalf("space limit failed at %d", i) } resp, selfResp, errs := usersCl.Self(token) @@ -121,7 +121,7 @@ func TestSpaceLimit(t *testing.T) { } cl := client.NewFilesClient(addr, token) - filePath := fmt.Sprintf("%s/spacelimit/f_%d", getUserName(0), 11) + filePath := fmt.Sprintf("%s/files/spacelimit/f_%d", getUserName(0), 11) res, _, errs := cl.Create(filePath, 1) if len(errs) > 0 { t.Fatal(errs) diff --git a/src/server/test_helpers.go b/src/server/test_helpers.go index 0102e84..f603a1b 100644 --- a/src/server/test_helpers.go +++ b/src/server/test_helpers.go @@ -58,12 +58,12 @@ func waitForReady(addr string) bool { } func compareFileContent(fs fspkg.ISimpleFS, uid, filePath string, expectedContent string) (bool, error) { - reader, err := fs.GetFileReader(filePath) + reader, id, err := fs.GetFileReader(filePath) if err != nil { return false, err } defer func() { - err = fs.CloseReader(filePath) + err = fs.CloseReader(fmt.Sprint(id)) if err != nil { fmt.Println(err) }