From 1b0bf9c5fd166f32c92089147988fc982dac11d1 Mon Sep 17 00:00:00 2001 From: hexxa Date: Thu, 16 Sep 2021 21:07:23 +0800 Subject: [PATCH] fix(files): fix fd closing issue --- go.mod | 2 +- go.sum | 2 ++ src/fs/local/fs.go | 29 ++++++++++++++++++---- src/handlers/fileshdr/handlers.go | 40 ++++++++++++++++++++++++------- src/server/server_files_test.go | 9 ++++++- src/server/test_helpers.go | 8 +++---- 6 files changed, 72 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index d850040..f8c2b8d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff - github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25 + github.com/ihexxa/multipart v0.0.0-20210916083128-8584a3f00d1d github.com/jessevdk/go-flags v1.4.0 github.com/json-iterator/go v1.1.11 // indirect github.com/mattn/go-isatty v0.0.13 // indirect diff --git a/go.sum b/go.sum index d877367..1d85ca5 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff h1:rsaVb8KIlg3gd7CEPT github.com/ihexxa/gocfg v0.0.0-20210914021417-6ba19520f0ff/go.mod h1:oqDTq1ywx4Qi9DdhFwwMHoPCYv6Txrfj2SY5WWcgiJs= github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25 h1:gQCaP2qoFWCTz17jj9EUhE/plgqJwk3nHbcS4RHQYCw= github.com/ihexxa/multipart v0.0.0-20201207132919-72f6e0e58b25/go.mod h1:rhOAe/52S/J1fq1VnXvKX8FHuo65I+IcYUozW4M7+wE= +github.com/ihexxa/multipart v0.0.0-20210916083128-8584a3f00d1d h1:+v33khYHVDPEuuWO/JE1IzhoIu5uNvEcSs5GmXc5Sjw= +github.com/ihexxa/multipart v0.0.0-20210916083128-8584a3f00d1d/go.mod h1:rhOAe/52S/J1fq1VnXvKX8FHuo65I+IcYUozW4M7+wE= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= diff --git a/src/fs/local/fs.go b/src/fs/local/fs.go index b7294a3..3be9a76 100644 --- a/src/fs/local/fs.go +++ b/src/fs/local/fs.go @@ -101,6 +101,7 @@ func (fs *LocalFS) closeOpens(iterateAll, forced bool, exclude map[string]bool) batch-- if forced || info.lastAccess.Add(fs.readerTTL).Before(time.Now()) { + fmt.Println("closing reader2", id, forced) var err error if err = info.fd.Sync(); err != nil { return closed, err @@ -132,12 +133,28 @@ func (fs *LocalFS) Sync() error { fs.opensMtx.Lock() defer fs.opensMtx.Unlock() - _, err := fs.closeOpens(true, true, map[string]bool{}) - return err + var err error + for _, info := range fs.opens { + if err = info.fd.Sync(); err != nil { + return err + } + } + + for _, info := range fs.readers { + if err = info.fd.Sync(); err != nil { + return err + } + } + + return nil } func (fs *LocalFS) Close() error { - return fs.Sync() + fs.opensMtx.Lock() + defer fs.opensMtx.Unlock() + + _, err := fs.closeOpens(true, true, map[string]bool{}) + return err } // check refers implementation of Dir.Open() in http package @@ -365,6 +382,8 @@ func (fs *LocalFS) GetFileReader(path string) (fs.ReadCloseSeeker, uint64, error fd: fd, lastAccess: time.Now(), } + + fmt.Println("new reader", id, path, fs.readers) return fd, id, nil } @@ -372,9 +391,11 @@ func (fs *LocalFS) CloseReader(id string) error { fs.opensMtx.Lock() defer fs.opensMtx.Unlock() + fmt.Println("close reader", id) + info, ok := fs.readers[id] if !ok { - return fmt.Errorf("reader not found: %s", id) + return fmt.Errorf("reader not found: %s %v", id, fs.readers) } var err error diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 7479254..f67c8f1 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -163,7 +163,7 @@ func (h *FileHandlers) Create(c *gin.Context) { if err != nil { c.JSON(q.ErrResp(c, 500, err)) } else { - c.JSON(q.ErrResp(c, 304, err)) + c.JSON(q.ErrResp(c, 304, fmt.Errorf("file(%s) exists", tmpFilePath))) } } else { c.JSON(q.ErrResp(c, 500, err)) @@ -252,7 +252,11 @@ func (h *FileHandlers) Metadata(c *gin.Context) { info, err := h.deps.FS().Stat(filePath) if err != nil { - c.JSON(q.ErrResp(c, 500, err)) + if os.IsNotExist(err) { + c.JSON(q.ErrResp(c, 404, os.ErrNotExist)) + } else { + c.JSON(q.ErrResp(c, 500, err)) + } return } @@ -591,6 +595,12 @@ func (h *FileHandlers) Download(c *gin.Context) { c.JSON(q.ErrResp(c, 500, err)) return } + defer func() { + err := limitedReader.Close() + if err != nil { + h.deps.Log().Errorf("failed to close limitedReader: %s", err) + } + }() c.DataFromReader(200, info.Size(), contentType, limitedReader, extraHeaders) return @@ -603,20 +613,34 @@ func (h *FileHandlers) Download(c *gin.Context) { return } - mw, contentLength, err := multipart.NewResponseWriter(fd, parts, false) + mr, err := multipart.NewMultipartReader(fd, parts) if err != nil { c.JSON(q.ErrResp(c, 500, err)) return } + mr.SetOutputHeaders(false) + contentLength := mr.ContentLength() + defer func() { + err := mr.Close() + if err != nil { + h.deps.Log().Errorf("failed to close multipart reader: %s", err) + } + }() // TODO: reader will be closed by multipart response writer? - go mw.Write() + go mr.Start() - limitedReader, err := h.GetStreamReader(userIDInt, mw) + limitedReader, err := h.GetStreamReader(userIDInt, mr) if err != nil { c.JSON(q.ErrResp(c, 500, err)) return } + defer func() { + err := limitedReader.Close() + if err != nil { + h.deps.Log().Errorf("failed to close limitedReader: %s", err) + } + }() // it takes the \r\n before body into account, so contentLength+2 c.DataFromReader(206, contentLength+2, contentType, limitedReader, extraHeaders) @@ -914,13 +938,11 @@ func (h *FileHandlers) GenerateHash(c *gin.Context) { c.JSON(q.Resp(200)) } -func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.Reader, error) { +func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.ReadCloser, error) { pr, pw := io.Pipe() chunkSize := 100 * 1024 // notice: it can not be greater than limiter's token count go func() { - defer pw.Close() - for { ok, err := h.deps.Limiter().CanRead(userID, chunkSize) if err != nil { @@ -935,6 +957,8 @@ func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.Reader, if err != nil { if err != io.EOF { pw.CloseWithError(err) + } else { + pw.CloseWithError(nil) } break } diff --git a/src/server/server_files_test.go b/src/server/server_files_test.go index 269491e..e3c3fe6 100644 --- a/src/server/server_files_test.go +++ b/src/server/server_files_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/ihexxa/quickshare/src/client" q "github.com/ihexxa/quickshare/src/handlers" @@ -130,7 +131,13 @@ func TestFileHandlers(t *testing.T) { for i := 0; i < 2; i++ { assertUploadOK(t, filePath, content, addr, token) - err = fs.Sync() + // file operation(deleting tmp file) may be async + // so creating tmp file in the 2nd time may conflict with the first time if it is not deleted yet + // checking file until it is deleted + // TODO: use fs.Stat() to avoid flaky testing... + time.Sleep(1000) + + err = fs.Close() if err != nil { t.Fatal(err) } diff --git a/src/server/test_helpers.go b/src/server/test_helpers.go index f603a1b..2176f7f 100644 --- a/src/server/test_helpers.go +++ b/src/server/test_helpers.go @@ -83,20 +83,20 @@ func assertUploadOK(t *testing.T, filePath, content, addr string, token *http.Co fileSize := int64(len([]byte(content))) res, _, errs := cl.Create(filePath, fileSize) if len(errs) > 0 { - t.Error(errs) + t.Fatal(errs) return false } else if res.StatusCode != 200 { - t.Error(res.StatusCode) + t.Fatal(res.StatusCode) return false } base64Content := base64.StdEncoding.EncodeToString([]byte(content)) res, _, errs = cl.UploadChunk(filePath, base64Content, 0) if len(errs) > 0 { - t.Error(errs) + t.Fatal(errs) return false } else if res.StatusCode != 200 { - t.Error(res.StatusCode) + t.Fatal(res.StatusCode) return false }