fix(fs): fix fd leaking issue

This commit is contained in:
hexxa 2021-09-13 15:15:03 +08:00 committed by Hexxa
parent fb9de35e81
commit 50f66704b1
6 changed files with 96 additions and 39 deletions

View file

@ -23,6 +23,7 @@ type ISimpleFS interface {
Close() error
Sync() error
GetFileReader(path string) (ReadCloseSeeker, error)
CloseReader(path string) error
Root() string
ListDir(path string) ([]os.FileInfo, error)
}

View file

@ -47,7 +47,7 @@ func NewLocalFS(root string, defaultPerm uint32, opensLimit, openTTL int) *Local
opensLimit: opensLimit,
openTTL: time.Duration(openTTL) * time.Second,
opensMtx: &sync.RWMutex{},
opensCleanSize: 10,
opensCleanSize: 3,
readersMtx: &sync.RWMutex{},
readers: map[string]*fileInfo{}, // TODO: track readers and close idles
}
@ -57,11 +57,17 @@ func (fs *LocalFS) Root() string {
return fs.root
}
// should be protected by opensMtx
func (fs *LocalFS) isTooManyOpens() bool {
return len(fs.opens)+len(fs.readers) > fs.opensLimit
}
// closeOpens assumes that it is called after opensMtx.Lock()
func (fs *LocalFS) closeOpens(closeAll bool, exclude map[string]bool) error {
func (fs *LocalFS) closeOpens(closeAll, forced bool, exclude map[string]bool) (int, error) {
batch := fs.opensCleanSize
var err error
closed := 0
for key, info := range fs.opens {
if exclude[key] {
continue
@ -72,24 +78,26 @@ func (fs *LocalFS) closeOpens(closeAll bool, exclude map[string]bool) error {
}
batch--
if info.lastAccess.Add(fs.openTTL).Before(time.Now()) {
delete(fs.opens, key)
if forced || info.lastAccess.Add(fs.openTTL).Before(time.Now()) {
if err = info.fd.Sync(); err != nil {
return err
return 0, err
}
if err := info.fd.Close(); err != nil {
return err
return 0, err
}
delete(fs.opens, key)
closed++
}
}
return nil
return closed, nil
}
func (fs *LocalFS) Sync() error {
fs.opensMtx.Lock()
defer fs.opensMtx.Unlock()
return fs.closeOpens(true, map[string]bool{})
_, err := fs.closeOpens(true, true, map[string]bool{})
return err
}
// check refers implementation of Dir.Open() in http package
@ -103,12 +111,12 @@ func (fs *LocalFS) translate(name string) (string, error) {
func (fs *LocalFS) Create(path string) error {
fs.opensMtx.Lock()
defer fs.opensMtx.Unlock()
if len(fs.opens) > fs.opensLimit {
err := fs.closeOpens(true, map[string]bool{})
if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{})
if err != nil {
return fmt.Errorf("too many opens and fail to clean: %w", err)
}
return ErrTooManyOpens
}
fullpath, err := fs.translate(path)
@ -149,12 +157,19 @@ func (fs *LocalFS) Rename(oldpath, newpath string) error {
if err != nil {
return err
}
_, err = os.Stat(fullOldPath)
fullNewPath, err := fs.translate(newpath)
if err != nil {
return err
}
fullNewPath, err := fs.translate(newpath)
if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{})
if err != nil {
return fmt.Errorf("too many opens and fail to clean: %w", err)
}
}
_, err = os.Stat(fullOldPath)
if err != nil {
return err
}
@ -182,8 +197,11 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) {
info, ok := fs.opens[fullpath]
if !ok {
if len(fs.opens) > fs.opensLimit {
return nil, ErrTooManyOpens
if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{})
if err != nil {
return nil, fmt.Errorf("too many opens and fail to clean: %w", err)
}
}
// because the fd may be for other usage, its flag is not set as os.O_RDONLY
@ -196,7 +214,7 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) {
lastAccess: time.Now(),
}
fs.opens[fullpath] = info
fs.closeOpens(false, map[string]bool{fullpath: true})
// fs.closeOpens(false, true, map[string]bool{fullpath: true})
}
return info, nil
@ -228,8 +246,11 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) {
info, ok := fs.opens[fullpath]
if !ok {
if len(fs.opens) > fs.opensLimit {
return nil, ErrTooManyOpens
if fs.isTooManyOpens() {
_, err := fs.closeOpens(false, true, map[string]bool{})
if err != nil {
return nil, fmt.Errorf("too many opens and fail to clean: %w", err)
}
}
// it does NOT create file for writing
@ -242,7 +263,6 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) {
lastAccess: time.Now(),
}
fs.opens[fullpath] = info
fs.closeOpens(false, map[string]bool{fullpath: true})
}
return info, nil
@ -304,6 +324,10 @@ func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, error) {
return nil, err
}
if fs.isTooManyOpens() {
return nil, ErrTooManyOpens
}
fd, err := os.OpenFile(fullpath, os.O_RDONLY, fs.defaultPerm)
if err != nil {
return nil, err
@ -319,6 +343,28 @@ func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, error) {
return fd, nil
}
func (fs *LocalFS) CloseReader(path string) error {
fullpath, err := fs.translate(path)
if err != nil {
return err
}
fs.readersMtx.Lock()
defer fs.readersMtx.Unlock()
info, ok := fs.readers[fullpath]
if !ok {
return fmt.Errorf("reader not found: %s", path)
}
err = info.fd.Close()
if err != nil {
return fmt.Errorf("close reader failed: %w", err)
}
delete(fs.readers, fullpath)
return nil
}
func (fs *LocalFS) ListDir(path string) ([]os.FileInfo, error) {
fullpath, err := fs.translate(path)
if err != nil {