Fix append (#43)
* fix(fs/local): remove O_APPEND for write_at * fix(files): clean download api * chore(workflows): add manual dispatch for docker * test(files): close fd in the progress of uploading
This commit is contained in:
parent
9b2157ac6a
commit
a9ccb4506c
4 changed files with 61 additions and 11 deletions
2
.github/workflows/cd_docker.yml
vendored
2
.github/workflows/cd_docker.yml
vendored
|
@ -2,6 +2,8 @@ name: cd-docker
|
||||||
on:
|
on:
|
||||||
release:
|
release:
|
||||||
types: [published]
|
types: [published]
|
||||||
|
workflow_dispatch: {}
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
push_to_registry:
|
push_to_registry:
|
||||||
name: Push Docker image to Docker Hub
|
name: Push Docker image to Docker Hub
|
||||||
|
|
|
@ -185,7 +185,7 @@ func (fs *LocalFS) ReadAt(path string, b []byte, off int64) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// because the fd may be for other usage, its flag is not set as os.O_RDONLY
|
// because the fd may be for other usage, its flag is not set as os.O_RDONLY
|
||||||
fd, err := os.OpenFile(fullpath, os.O_RDWR|os.O_APPEND, fs.defaultPerm)
|
fd, err := os.OpenFile(fullpath, os.O_RDWR, fs.defaultPerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ func (fs *LocalFS) WriteAt(path string, b []byte, off int64) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// it does NOT create file for writing
|
// it does NOT create file for writing
|
||||||
fd, err := os.OpenFile(fullpath, os.O_RDWR|os.O_APPEND, fs.defaultPerm)
|
fd, err := os.OpenFile(fullpath, os.O_RDWR, fs.defaultPerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -394,8 +394,7 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// defer r.Close()
|
// reader will be closed by multipart response writer
|
||||||
// := r.(*os.File)
|
|
||||||
|
|
||||||
// respond to normal requests
|
// respond to normal requests
|
||||||
if ifRangeVal != "" || rangeVal == "" {
|
if ifRangeVal != "" || rangeVal == "" {
|
||||||
|
@ -410,7 +409,6 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// pr, pw := io.Pipe()
|
|
||||||
mw, contentLength, err := multipart.NewResponseWriter(r, parts, false)
|
mw, contentLength, err := multipart.NewResponseWriter(r, parts, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.JSON(q.ErrResp(c, 500, err))
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
|
@ -418,14 +416,9 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
go mw.Write()
|
go mw.Write()
|
||||||
// WriteResponse(r, pw, filePath, parts)
|
|
||||||
// if err != nil {
|
|
||||||
// c.JSON(q.ErrResp(c, 500, err))
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
extraHeaders := map[string]string{
|
extraHeaders := map[string]string{
|
||||||
// "Content-Disposition": fmt.Sprintf(`attachment; filename="%s"`, filePath),
|
"Content-Disposition": fmt.Sprintf(`attachment; filename="%s"`, info.Name()),
|
||||||
}
|
}
|
||||||
// it takes the \r\n before body into account, so contentLength+2
|
// it takes the \r\n before body into account, so contentLength+2
|
||||||
c.DataFromReader(206, contentLength+2, contentType, mw, extraHeaders)
|
c.DataFromReader(206, contentLength+2, contentType, mw, extraHeaders)
|
||||||
|
|
|
@ -446,4 +446,59 @@ func TestFileHandlers(t *testing.T) {
|
||||||
t.Fatalf("info is not deleted, info len(%d)", len(lResp.UploadInfos))
|
t.Fatalf("info is not deleted, info len(%d)", len(lResp.UploadInfos))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("test uploading APIs: Create, Stop, UploadChunk)", func(t *testing.T) {
|
||||||
|
cl := client.NewFilesClient(addr)
|
||||||
|
|
||||||
|
files := map[string]string{
|
||||||
|
"uploadings/path1/f1": "12345678",
|
||||||
|
}
|
||||||
|
|
||||||
|
for filePath, content := range files {
|
||||||
|
fileSize := int64(len([]byte(content)))
|
||||||
|
res, _, errs := cl.Create(filePath, fileSize)
|
||||||
|
if len(errs) > 0 {
|
||||||
|
t.Fatal(errs)
|
||||||
|
} else if res.StatusCode != 200 {
|
||||||
|
t.Fatal(res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks := [][]byte{
|
||||||
|
[]byte(content)[:fileSize/2],
|
||||||
|
[]byte(content)[fileSize/2:],
|
||||||
|
}
|
||||||
|
offset := int64(0)
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
base64Content := base64.StdEncoding.EncodeToString(chunk)
|
||||||
|
res, _, errs = cl.UploadChunk(filePath, base64Content, offset)
|
||||||
|
offset += int64(len(chunk))
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
t.Fatal(errs)
|
||||||
|
} else if res.StatusCode != 200 {
|
||||||
|
t.Fatal(res.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = fs.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = fs.Sync()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// metadata
|
||||||
|
_, mRes, errs := cl.Metadata(filePath)
|
||||||
|
if len(errs) > 0 {
|
||||||
|
t.Fatal(errs)
|
||||||
|
} else if mRes.Size != fileSize {
|
||||||
|
t.Fatal("incorrect uploaded size", mRes)
|
||||||
|
}
|
||||||
|
|
||||||
|
assetDownloadOK(t, filePath, content)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue