fix(fs): fix issues related to closing fd

This commit is contained in:
hexxa 2021-09-14 22:47:00 +08:00 committed by Hexxa
parent 70dda37c54
commit 5e8567d470
8 changed files with 120 additions and 105 deletions

View file

@ -22,8 +22,8 @@ type ISimpleFS interface {
Stat(path string) (os.FileInfo, error) Stat(path string) (os.FileInfo, error)
Close() error Close() error
Sync() error Sync() error
GetFileReader(path string) (ReadCloseSeeker, error) GetFileReader(path string) (ReadCloseSeeker, uint64, error)
CloseReader(path string) error CloseReader(id string) error
Root() string Root() string
ListDir(path string) ([]os.FileInfo, error) ListDir(path string) ([]os.FileInfo, error)
} }

View file

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/ihexxa/quickshare/src/fs" "github.com/ihexxa/quickshare/src/fs"
"github.com/ihexxa/quickshare/src/idgen"
) )
var ErrTooManyOpens = errors.New("too many opened files") var ErrTooManyOpens = errors.New("too many opened files")
@ -25,8 +26,9 @@ type LocalFS struct {
opensMtx *sync.RWMutex opensMtx *sync.RWMutex
opensCleanSize int opensCleanSize int
openTTL time.Duration openTTL time.Duration
readersMtx *sync.RWMutex readerTTL time.Duration
readers map[string]*fileInfo readers map[string]*fileInfo
ider idgen.IIDGen
} }
type fileInfo struct { type fileInfo struct {
@ -34,21 +36,22 @@ type fileInfo struct {
fd *os.File fd *os.File
} }
func NewLocalFS(root string, defaultPerm uint32, opensLimit, openTTL int) *LocalFS { func NewLocalFS(root string, defaultPerm uint32, opensLimit, openTTL, readerTTL int, ider idgen.IIDGen) *LocalFS {
if root == "" { if root == "" {
root = "." root = "."
} }
return &LocalFS{ return &LocalFS{
ider: ider,
root: root, root: root,
defaultPerm: os.FileMode(defaultPerm), defaultPerm: os.FileMode(defaultPerm),
defaultDirPerm: os.FileMode(0775), defaultDirPerm: os.FileMode(0775),
opens: map[string]*fileInfo{}, opens: map[string]*fileInfo{},
opensLimit: opensLimit, opensLimit: opensLimit,
openTTL: time.Duration(openTTL) * time.Second, openTTL: time.Duration(openTTL) * time.Second,
readerTTL: time.Duration(readerTTL) * time.Second,
opensMtx: &sync.RWMutex{}, opensMtx: &sync.RWMutex{},
opensCleanSize: 3, opensCleanSize: 3,
readersMtx: &sync.RWMutex{},
readers: map[string]*fileInfo{}, // TODO: track readers and close idles readers: map[string]*fileInfo{}, // TODO: track readers and close idles
} }
} }
@ -63,29 +66,49 @@ func (fs *LocalFS) isTooManyOpens() bool {
} }
// closeOpens assumes that it is called after opensMtx.Lock() // closeOpens assumes that it is called after opensMtx.Lock()
func (fs *LocalFS) closeOpens(closeAll, forced bool, exclude map[string]bool) (int, error) { func (fs *LocalFS) closeOpens(iterateAll, forced bool, exclude map[string]bool) (int, error) {
batch := fs.opensCleanSize batch := fs.opensCleanSize
var err error var err error
closed := 0 closed := 0
for key, info := range fs.opens { for filePath, info := range fs.opens {
if exclude[key] { if !iterateAll && exclude[filePath] {
continue continue
} }
if !closeAll && batch <= 0 { if !iterateAll && batch <= 0 {
break break
} }
batch-- batch--
if forced || info.lastAccess.Add(fs.openTTL).Before(time.Now()) { if forced || info.lastAccess.Add(fs.openTTL).Before(time.Now()) {
if err = fs.closeInfo(filePath, info); err != nil {
return closed, err
}
closed++
}
}
batch = fs.opensCleanSize
for id, info := range fs.readers {
if !iterateAll && exclude[id] {
continue
}
if !iterateAll && batch <= 0 {
break
}
batch--
if forced || info.lastAccess.Add(fs.readerTTL).Before(time.Now()) {
var err error
if err = info.fd.Sync(); err != nil { if err = info.fd.Sync(); err != nil {
return 0, err return closed, err
} }
if err := info.fd.Close(); err != nil { if err := info.fd.Close(); err != nil {
return 0, err return closed, err
} }
delete(fs.opens, key) delete(fs.readers, id)
closed++ closed++
} }
} }
@ -93,13 +116,30 @@ func (fs *LocalFS) closeOpens(closeAll, forced bool, exclude map[string]bool) (i
return closed, nil return closed, nil
} }
func (fs *LocalFS) closeInfo(key string, info *fileInfo) error {
var err error
if err = info.fd.Sync(); err != nil {
return err
}
if err := info.fd.Close(); err != nil {
return err
}
delete(fs.opens, key)
return nil
}
func (fs *LocalFS) Sync() error { func (fs *LocalFS) Sync() error {
fs.opensMtx.Lock() fs.opensMtx.Lock()
defer fs.opensMtx.Unlock() defer fs.opensMtx.Unlock()
_, err := fs.closeOpens(true, true, map[string]bool{}) _, err := fs.closeOpens(true, true, map[string]bool{})
return err return err
} }
func (fs *LocalFS) Close() error {
return fs.Sync()
}
// check refers implementation of Dir.Open() in http package // check refers implementation of Dir.Open() in http package
func (fs *LocalFS) translate(name string) (string, error) { func (fs *LocalFS) translate(name string) (string, error) {
if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) { if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) {
@ -113,9 +153,9 @@ func (fs *LocalFS) Create(path string) error {
defer fs.opensMtx.Unlock() defer fs.opensMtx.Unlock()
if fs.isTooManyOpens() { if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{}) closed, err := fs.closeOpens(false, false, map[string]bool{})
if err != nil { if err != nil || closed == 0 {
return fmt.Errorf("too many opens and fail to clean: %w", err) return fmt.Errorf("too many opens and fail to clean(%d): %w", closed, err)
} }
} }
@ -124,11 +164,15 @@ func (fs *LocalFS) Create(path string) error {
return err return err
} }
_, ok := fs.opens[fullpath]
if ok {
return os.ErrExist
}
fd, err := os.OpenFile(fullpath, os.O_CREATE|os.O_RDWR|os.O_EXCL, fs.defaultPerm) fd, err := os.OpenFile(fullpath, os.O_CREATE|os.O_RDWR|os.O_EXCL, fs.defaultPerm)
if err != nil { if err != nil {
return err return err
} }
fs.opens[fullpath] = &fileInfo{ fs.opens[fullpath] = &fileInfo{
lastAccess: time.Now(), lastAccess: time.Now(),
fd: fd, fd: fd,
@ -153,6 +197,8 @@ func (fs *LocalFS) Remove(entryPath string) error {
} }
func (fs *LocalFS) Rename(oldpath, newpath string) error { func (fs *LocalFS) Rename(oldpath, newpath string) error {
// TODO: if the rename will be implemented without Rename
// we must check if the files are in reading/writing
fullOldPath, err := fs.translate(oldpath) fullOldPath, err := fs.translate(oldpath)
if err != nil { if err != nil {
return err return err
@ -161,17 +207,8 @@ func (fs *LocalFS) Rename(oldpath, newpath string) error {
if err != nil { if err != nil {
return err return err
} }
if fullOldPath == fullNewPath {
if fs.isTooManyOpens() { return nil
_, err := fs.closeOpens(false, true, map[string]bool{})
if err != nil {
return fmt.Errorf("too many opens and fail to clean: %w", err)
}
}
_, err = os.Stat(fullOldPath)
if err != nil {
return err
} }
// avoid replacing existing file/folder // avoid replacing existing file/folder
@ -198,9 +235,9 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) {
info, ok := fs.opens[fullpath] info, ok := fs.opens[fullpath]
if !ok { if !ok {
if fs.isTooManyOpens() { if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{}) closed, err := fs.closeOpens(false, false, map[string]bool{})
if err != nil { if err != nil || closed == 0 {
return nil, fmt.Errorf("too many opens and fail to clean: %w", err) return nil, fmt.Errorf("too many opens and fail to clean (%d): %w", closed, err)
} }
} }
@ -214,7 +251,8 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) {
lastAccess: time.Now(), lastAccess: time.Now(),
} }
fs.opens[fullpath] = info fs.opens[fullpath] = info
// fs.closeOpens(false, true, map[string]bool{fullpath: true}) } else {
info.lastAccess = time.Now()
} }
return info, nil return info, nil
@ -247,9 +285,10 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) {
info, ok := fs.opens[fullpath] info, ok := fs.opens[fullpath]
if !ok { if !ok {
if fs.isTooManyOpens() { if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{}) closed, err := fs.closeOpens(false, false, map[string]bool{})
if err != nil { if err != nil || closed == 0 {
return nil, fmt.Errorf("too many opens and fail to clean: %w", err) // TODO: return Eagain and make client retry later
return nil, fmt.Errorf("too many opens and fail to clean (%d): %w", closed, err)
} }
} }
@ -263,6 +302,8 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) {
lastAccess: time.Now(), lastAccess: time.Now(),
} }
fs.opens[fullpath] = info fs.opens[fullpath] = info
} else {
info.lastAccess = time.Now()
} }
return info, nil return info, nil
@ -297,71 +338,53 @@ func (fs *LocalFS) Stat(path string) (os.FileInfo, error) {
return os.Stat(fullpath) return os.Stat(fullpath)
} }
func (fs *LocalFS) Close() error { // readers are not tracked by opens
func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, uint64, error) {
fullpath, err := fs.translate(path)
if err != nil {
return nil, 0, err
}
fs.opensMtx.Lock() fs.opensMtx.Lock()
defer fs.opensMtx.Unlock() defer fs.opensMtx.Unlock()
var err error
for filePath, info := range fs.opens {
err = info.fd.Sync()
if err != nil {
return err
}
err = info.fd.Close()
if err != nil {
return err
}
delete(fs.opens, filePath)
}
return nil
}
// readers are not tracked by opens
func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, error) {
fullpath, err := fs.translate(path)
if err != nil {
return nil, err
}
if fs.isTooManyOpens() { if fs.isTooManyOpens() {
return nil, ErrTooManyOpens closed, err := fs.closeOpens(false, false, map[string]bool{})
if err != nil || closed == 0 {
return nil, 0, fmt.Errorf("too many opens and fail to clean (%d): %w", closed, err)
}
} }
fd, err := os.OpenFile(fullpath, os.O_RDONLY, fs.defaultPerm) fd, err := os.OpenFile(fullpath, os.O_RDONLY, fs.defaultPerm)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
fs.readersMtx.Lock() id := fs.ider.Gen()
defer fs.readersMtx.Unlock() fs.readers[fmt.Sprint(id)] = &fileInfo{
fs.readers[fullpath] = &fileInfo{
fd: fd, fd: fd,
lastAccess: time.Now(), lastAccess: time.Now(),
} }
return fd, nil return fd, id, nil
} }
func (fs *LocalFS) CloseReader(path string) error { func (fs *LocalFS) CloseReader(id string) error {
fullpath, err := fs.translate(path) fs.opensMtx.Lock()
if err != nil { defer fs.opensMtx.Unlock()
info, ok := fs.readers[id]
if !ok {
return fmt.Errorf("reader not found: %s", id)
}
var err error
if err = info.fd.Sync(); err != nil {
return err return err
} }
if err := info.fd.Close(); err != nil {
fs.readersMtx.Lock() return err
defer fs.readersMtx.Unlock()
info, ok := fs.readers[fullpath]
if !ok {
return fmt.Errorf("reader not found: %s", path)
} }
delete(fs.readers, id)
err = info.fd.Close()
if err != nil {
return fmt.Errorf("close reader failed: %w", err)
}
delete(fs.readers, fullpath)
return nil return nil
} }

View file

@ -22,12 +22,12 @@ func (h *FileHandlers) genSha1(msg worker.IMsg) error {
return fmt.Errorf("fail to unmarshal sha1 msg: %w", err) return fmt.Errorf("fail to unmarshal sha1 msg: %w", err)
} }
f, err := h.deps.FS().GetFileReader(taskInputs.FilePath) f, id, err := h.deps.FS().GetFileReader(taskInputs.FilePath)
if err != nil { if err != nil {
return fmt.Errorf("fail to get reader: %s", err) return fmt.Errorf("fail to get reader: %s", err)
} }
defer func() { defer func() {
err := h.deps.FS().CloseReader(taskInputs.FilePath) err := h.deps.FS().CloseReader(fmt.Sprint(id))
if err != nil { if err != nil {
h.deps.Log().Errorf("failed to close file: %s", err) h.deps.Log().Errorf("failed to close file: %s", err)
} }

View file

@ -568,13 +568,13 @@ func (h *FileHandlers) Download(c *gin.Context) {
} }
contentType := http.DetectContentType(fileHeadBuf[:read]) contentType := http.DetectContentType(fileHeadBuf[:read])
fd, err := h.deps.FS().GetFileReader(filePath) fd, id, err := h.deps.FS().GetFileReader(filePath)
if err != nil { if err != nil {
c.JSON(q.ErrResp(c, 500, err)) c.JSON(q.ErrResp(c, 500, err))
return return
} }
defer func() { defer func() {
err := h.deps.FS().CloseReader(filePath) err := h.deps.FS().CloseReader(fmt.Sprint(id))
if err != nil { if err != nil {
h.deps.Log().Errorf("failed to close: %s", err) h.deps.Log().Errorf("failed to close: %s", err)
} }

View file

@ -110,9 +110,10 @@ func initDeps(cfg gocfg.ICfg) *depidx.Deps {
mkRoot(rootPath) mkRoot(rootPath)
opensLimit := cfg.GrabInt("Fs.OpensLimit") opensLimit := cfg.GrabInt("Fs.OpensLimit")
openTTL := cfg.GrabInt("Fs.OpenTTL") openTTL := cfg.GrabInt("Fs.OpenTTL")
readerTTL := cfg.GrabInt("Server.WriteTimeout") / 1000 // millisecond -> second
ider := simpleidgen.New() ider := simpleidgen.New()
filesystem := local.NewLocalFS(rootPath, 0660, opensLimit, openTTL) filesystem := local.NewLocalFS(rootPath, 0660, opensLimit, openTTL, readerTTL, ider)
jwtEncDec := jwt.NewJWTEncDec(secret) jwtEncDec := jwt.NewJWTEncDec(secret)
kv := boltdbpvd.New(rootPath, 1024) kv := boltdbpvd.New(rootPath, 1024)
users, err := userstore.NewKVUserStore(kv) users, err := userstore.NewKVUserStore(kv)
@ -313,6 +314,7 @@ func (s *Server) Start() error {
func (s *Server) Shutdown() error { func (s *Server) Shutdown() error {
// TODO: add timeout // TODO: add timeout
s.deps.Workers().Stop() s.deps.Workers().Stop()
s.deps.FS().Close()
s.deps.Log().Sync() s.deps.Log().Sync()
return s.server.Shutdown(context.Background()) return s.server.Shutdown(context.Background())
} }

View file

@ -575,7 +575,7 @@ func TestFileHandlers(t *testing.T) {
// cl := client.NewFilesClient(addr) // cl := client.NewFilesClient(addr)
files := map[string]string{ files := map[string]string{
"qs/files/uploadings/path1/f1": "12345678", "qs/files/uploadings1/path1/f1": "12345678",
} }
for filePath, content := range files { for filePath, content := range files {
@ -594,18 +594,13 @@ func TestFileHandlers(t *testing.T) {
offset := int64(0) offset := int64(0)
for _, chunk := range chunks { for _, chunk := range chunks {
base64Content := base64.StdEncoding.EncodeToString(chunk) base64Content := base64.StdEncoding.EncodeToString(chunk)
res, _, errs = cl.UploadChunk(filePath, base64Content, offset) res, bodyStr, errs := cl.UploadChunk(filePath, base64Content, offset)
offset += int64(len(chunk)) offset += int64(len(chunk))
if len(errs) > 0 { if len(errs) > 0 {
t.Fatal(errs) t.Fatal(errs)
} else if res.StatusCode != 200 { } else if res.StatusCode != 200 {
t.Fatal(res.StatusCode) t.Fatal(fmt.Sprintln(res.StatusCode, bodyStr))
}
err = fs.Close()
if err != nil {
t.Fatal(err)
} }
} }
@ -663,11 +658,6 @@ func TestFileHandlers(t *testing.T) {
} else if res.StatusCode != 200 { } else if res.StatusCode != 200 {
t.Fatal(res.StatusCode) t.Fatal(res.StatusCode)
} }
err = fs.Close()
if err != nil {
t.Fatal(err)
}
} }
err = fs.Sync() err = fs.Sync()

View file

@ -89,7 +89,7 @@ func TestSpaceLimit(t *testing.T) {
t.Fatal(resp.StatusCode) t.Fatal(resp.StatusCode)
} }
t.Run("test space limitiong: Upload", func(t *testing.T) { t.Run("test space limiting: Upload", func(t *testing.T) {
usersCl := client.NewSingleUserClient(addr) usersCl := client.NewSingleUserClient(addr)
resp, _, errs := usersCl.Login(getUserName(0), userPwd) resp, _, errs := usersCl.Login(getUserName(0), userPwd)
if len(errs) > 0 { if len(errs) > 0 {
@ -105,9 +105,9 @@ func TestSpaceLimit(t *testing.T) {
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
ok := assertUploadOK(t, fmt.Sprintf("%s/spacelimit/f_%d", getUserName(0), 0), fileContent, addr, token) ok := assertUploadOK(t, fmt.Sprintf("%s/files/spacelimit/f_%d", getUserName(0), i), fileContent, addr, token)
if !ok { if !ok {
t.Fatalf("space limit failed at %d", 0) t.Fatalf("space limit failed at %d", i)
} }
resp, selfResp, errs := usersCl.Self(token) resp, selfResp, errs := usersCl.Self(token)
@ -121,7 +121,7 @@ func TestSpaceLimit(t *testing.T) {
} }
cl := client.NewFilesClient(addr, token) cl := client.NewFilesClient(addr, token)
filePath := fmt.Sprintf("%s/spacelimit/f_%d", getUserName(0), 11) filePath := fmt.Sprintf("%s/files/spacelimit/f_%d", getUserName(0), 11)
res, _, errs := cl.Create(filePath, 1) res, _, errs := cl.Create(filePath, 1)
if len(errs) > 0 { if len(errs) > 0 {
t.Fatal(errs) t.Fatal(errs)

View file

@ -58,12 +58,12 @@ func waitForReady(addr string) bool {
} }
func compareFileContent(fs fspkg.ISimpleFS, uid, filePath string, expectedContent string) (bool, error) { func compareFileContent(fs fspkg.ISimpleFS, uid, filePath string, expectedContent string) (bool, error) {
reader, err := fs.GetFileReader(filePath) reader, id, err := fs.GetFileReader(filePath)
if err != nil { if err != nil {
return false, err return false, err
} }
defer func() { defer func() {
err = fs.CloseReader(filePath) err = fs.CloseReader(fmt.Sprint(id))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }