fix(files): fix fd closing issue
This commit is contained in:
parent
5e8567d470
commit
1b0bf9c5fd
6 changed files with 72 additions and 18 deletions
2
go.mod
2
go.mod
|
@ -13,7 +13,7 @@ require (
|
||||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
|
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
|
||||||
github.com/golang/protobuf v1.5.2 // indirect
|
github.com/golang/protobuf v1.5.2 // indirect
|
||||||
github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff
|
github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff
|
||||||
github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25
|
github.com/ihexxa/multipart v0.0.0-20210916083128-8584a3f00d1d
|
||||||
github.com/jessevdk/go-flags v1.4.0
|
github.com/jessevdk/go-flags v1.4.0
|
||||||
github.com/json-iterator/go v1.1.11 // indirect
|
github.com/json-iterator/go v1.1.11 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.13 // indirect
|
github.com/mattn/go-isatty v0.0.13 // indirect
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -63,6 +63,8 @@ github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff h1:rsaVb8KIlg3gd7CEPT
|
||||||
github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff/go.mod h1:oqDTq1ywx4Qi9DdhFwwMHoPCYv6Txrfj2SY5WWcgiJs=
|
github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff/go.mod h1:oqDTq1ywx4Qi9DdhFwwMHoPCYv6Txrfj2SY5WWcgiJs=
|
||||||
github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25 h1:gQCaP2qoFWCTz17jj9EUhE/plgqJwk3nHbcS4RHQYCw=
|
github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25 h1:gQCaP2qoFWCTz17jj9EUhE/plgqJwk3nHbcS4RHQYCw=
|
||||||
github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25/go.mod h1:rhOAe/52S/J1fq1VnXvKX8FHuo65I+IcYUozW4M7+wE=
|
github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25/go.mod h1:rhOAe/52S/J1fq1VnXvKX8FHuo65I+IcYUozW4M7+wE=
|
||||||
|
github.com/ihexxa/multipart v0.0.0-20210916083128-8584a3f00d1d h1:+v33khYHVDPEuuWO/JE1IzhoIu5uNvEcSs5GmXc5Sjw=
|
||||||
|
github.com/ihexxa/multipart v0.0.0-20210916083128-8584a3f00d1d/go.mod h1:rhOAe/52S/J1fq1VnXvKX8FHuo65I+IcYUozW4M7+wE=
|
||||||
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
|
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
|
||||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||||
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||||
|
|
|
@ -101,6 +101,7 @@ func (fs *LocalFS) closeOpens(iterateAll, forced bool, exclude map[string]bool)
|
||||||
batch--
|
batch--
|
||||||
|
|
||||||
if forced || info.lastAccess.Add(fs.readerTTL).Before(time.Now()) {
|
if forced || info.lastAccess.Add(fs.readerTTL).Before(time.Now()) {
|
||||||
|
fmt.Println("closing reader2", id, forced)
|
||||||
var err error
|
var err error
|
||||||
if err = info.fd.Sync(); err != nil {
|
if err = info.fd.Sync(); err != nil {
|
||||||
return closed, err
|
return closed, err
|
||||||
|
@ -132,12 +133,28 @@ func (fs *LocalFS) Sync() error {
|
||||||
fs.opensMtx.Lock()
|
fs.opensMtx.Lock()
|
||||||
defer fs.opensMtx.Unlock()
|
defer fs.opensMtx.Unlock()
|
||||||
|
|
||||||
_, err := fs.closeOpens(true, true, map[string]bool{})
|
var err error
|
||||||
|
for _, info := range fs.opens {
|
||||||
|
if err = info.fd.Sync(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range fs.readers {
|
||||||
|
if err = info.fd.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fs *LocalFS) Close() error {
|
func (fs *LocalFS) Close() error {
|
||||||
return fs.Sync()
|
fs.opensMtx.Lock()
|
||||||
|
defer fs.opensMtx.Unlock()
|
||||||
|
|
||||||
|
_, err := fs.closeOpens(true, true, map[string]bool{})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// check refers implementation of Dir.Open() in http package
|
// check refers implementation of Dir.Open() in http package
|
||||||
|
@ -365,6 +382,8 @@ func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, uint64, error
|
||||||
fd: fd,
|
fd: fd,
|
||||||
lastAccess: time.Now(),
|
lastAccess: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Println("new reader", id, path, fs.readers)
|
||||||
return fd, id, nil
|
return fd, id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,9 +391,11 @@ func (fs *LocalFS) CloseReader(id string) error {
|
||||||
fs.opensMtx.Lock()
|
fs.opensMtx.Lock()
|
||||||
defer fs.opensMtx.Unlock()
|
defer fs.opensMtx.Unlock()
|
||||||
|
|
||||||
|
fmt.Println("close reader", id)
|
||||||
|
|
||||||
info, ok := fs.readers[id]
|
info, ok := fs.readers[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("reader not found: %s", id)
|
return fmt.Errorf("reader not found: %s %v", id, fs.readers)
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -163,7 +163,7 @@ func (h *FileHandlers) Create(c *gin.Context) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
} else {
|
} else {
|
||||||
c.JSON(q.ErrResp(c, 304, err))
|
c.JSON(q.ErrResp(c, 304, fmt.Errorf("file(%s) exists", tmpFilePath)))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
|
@ -252,7 +252,11 @@ func (h *FileHandlers) Metadata(c *gin.Context) {
|
||||||
|
|
||||||
info, err := h.deps.FS().Stat(filePath)
|
info, err := h.deps.FS().Stat(filePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
c.JSON(q.ErrResp(c, 404, os.ErrNotExist))
|
||||||
|
} else {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,6 +595,12 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
err := limitedReader.Close()
|
||||||
|
if err != nil {
|
||||||
|
h.deps.Log().Errorf("failed to close limitedReader: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
c.DataFromReader(200, info.Size(), contentType, limitedReader, extraHeaders)
|
c.DataFromReader(200, info.Size(), contentType, limitedReader, extraHeaders)
|
||||||
return
|
return
|
||||||
|
@ -603,20 +613,34 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mw, contentLength, err := multipart.NewResponseWriter(fd, parts, false)
|
mr, err := multipart.NewMultipartReader(fd, parts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
mr.SetOutputHeaders(false)
|
||||||
|
contentLength := mr.ContentLength()
|
||||||
|
defer func() {
|
||||||
|
err := mr.Close()
|
||||||
|
if err != nil {
|
||||||
|
h.deps.Log().Errorf("failed to close multipart reader: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// TODO: reader will be closed by multipart response writer?
|
// TODO: reader will be closed by multipart response writer?
|
||||||
go mw.Write()
|
go mr.Start()
|
||||||
|
|
||||||
limitedReader, err := h.GetStreamReader(userIDInt, mw)
|
limitedReader, err := h.GetStreamReader(userIDInt, mr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
defer func() {
|
||||||
|
err := limitedReader.Close()
|
||||||
|
if err != nil {
|
||||||
|
h.deps.Log().Errorf("failed to close limitedReader: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// it takes the \r\n before body into account, so contentLength+2
|
// it takes the \r\n before body into account, so contentLength+2
|
||||||
c.DataFromReader(206, contentLength+2, contentType, limitedReader, extraHeaders)
|
c.DataFromReader(206, contentLength+2, contentType, limitedReader, extraHeaders)
|
||||||
|
@ -914,13 +938,11 @@ func (h *FileHandlers) GenerateHash(c *gin.Context) {
|
||||||
c.JSON(q.Resp(200))
|
c.JSON(q.Resp(200))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.Reader, error) {
|
func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.ReadCloser, error) {
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
chunkSize := 100 * 1024 // notice: it can not be greater than limiter's token count
|
chunkSize := 100 * 1024 // notice: it can not be greater than limiter's token count
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer pw.Close()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ok, err := h.deps.Limiter().CanRead(userID, chunkSize)
|
ok, err := h.deps.Limiter().CanRead(userID, chunkSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -935,6 +957,8 @@ func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.Reader,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
pw.CloseWithError(err)
|
pw.CloseWithError(err)
|
||||||
|
} else {
|
||||||
|
pw.CloseWithError(nil)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ihexxa/quickshare/src/client"
|
"github.com/ihexxa/quickshare/src/client"
|
||||||
q "github.com/ihexxa/quickshare/src/handlers"
|
q "github.com/ihexxa/quickshare/src/handlers"
|
||||||
|
@ -130,7 +131,13 @@ func TestFileHandlers(t *testing.T) {
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
assertUploadOK(t, filePath, content, addr, token)
|
assertUploadOK(t, filePath, content, addr, token)
|
||||||
|
|
||||||
err = fs.Sync()
|
// file operation(deleting tmp file) may be async
|
||||||
|
// so creating tmp file in the 2nd time may conflict with the first time if it is not deleted yet
|
||||||
|
// checking file until it is deleted
|
||||||
|
// TODO: use fs.Stat() to avoid flaky testing...
|
||||||
|
time.Sleep(1000)
|
||||||
|
|
||||||
|
err = fs.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,20 +83,20 @@ func assertUploadOK(t *testing.T, filePath, content, addr string, token *http.Co
|
||||||
fileSize := int64(len([]byte(content)))
|
fileSize := int64(len([]byte(content)))
|
||||||
res, _, errs := cl.Create(filePath, fileSize)
|
res, _, errs := cl.Create(filePath, fileSize)
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
t.Error(errs)
|
t.Fatal(errs)
|
||||||
return false
|
return false
|
||||||
} else if res.StatusCode != 200 {
|
} else if res.StatusCode != 200 {
|
||||||
t.Error(res.StatusCode)
|
t.Fatal(res.StatusCode)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
base64Content := base64.StdEncoding.EncodeToString([]byte(content))
|
base64Content := base64.StdEncoding.EncodeToString([]byte(content))
|
||||||
res, _, errs = cl.UploadChunk(filePath, base64Content, 0)
|
res, _, errs = cl.UploadChunk(filePath, base64Content, 0)
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
t.Error(errs)
|
t.Fatal(errs)
|
||||||
return false
|
return false
|
||||||
} else if res.StatusCode != 200 {
|
} else if res.StatusCode != 200 {
|
||||||
t.Error(res.StatusCode)
|
t.Fatal(res.StatusCode)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue