feat(server): add reindexing API with tests
This commit is contained in:
parent
fc653d1c15
commit
fd46d7b816
7 changed files with 166 additions and 26 deletions
|
@ -263,3 +263,9 @@ func (cl *FilesClient) SearchItems(keyword string) (*http.Response, *fileshdr.Se
|
||||||
}
|
}
|
||||||
return resp, searchResp, nil
|
return resp, searchResp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cl *FilesClient) Reindex() (*http.Response, string, []error) {
|
||||||
|
return cl.r.Put(cl.url("/v1/fs/reindex")).
|
||||||
|
AddCookie(cl.token).
|
||||||
|
End()
|
||||||
|
}
|
||||||
|
|
|
@ -5,12 +5,15 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
|
||||||
"github.com/ihexxa/quickshare/src/worker"
|
"github.com/ihexxa/quickshare/src/worker"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MsgTypeSha1 = "sha1"
|
MsgTypeSha1 = "sha1"
|
||||||
|
MsgTypeIndexing = "indexing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Sha1Params struct {
|
type Sha1Params struct {
|
||||||
|
@ -50,3 +53,39 @@ func (h *FileHandlers) genSha1(msg worker.IMsg) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IndexingParams struct{}
|
||||||
|
|
||||||
|
func (h *FileHandlers) indexingItems(msg worker.IMsg) error {
|
||||||
|
err := h.deps.FileIndex().Reset()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
root := ""
|
||||||
|
queue := []string{root}
|
||||||
|
var infos []os.FileInfo
|
||||||
|
for len(queue) > 0 {
|
||||||
|
pathname := queue[0]
|
||||||
|
queue = queue[1:]
|
||||||
|
infos, err = h.deps.FS().ListDir(pathname)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fileInfo := range infos {
|
||||||
|
childPath := path.Join(pathname, fileInfo.Name())
|
||||||
|
if fileInfo.IsDir() {
|
||||||
|
queue = append(queue, childPath)
|
||||||
|
} else {
|
||||||
|
err = h.deps.FileIndex().AddPath(childPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h.deps.Log().Info("reindexing done")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ func NewFileHandlers(cfg gocfg.ICfg, deps *depidx.Deps) (*FileHandlers, error) {
|
||||||
deps: deps,
|
deps: deps,
|
||||||
}
|
}
|
||||||
deps.Workers().AddHandler(MsgTypeSha1, handlers.genSha1)
|
deps.Workers().AddHandler(MsgTypeSha1, handlers.genSha1)
|
||||||
|
deps.Workers().AddHandler(MsgTypeIndexing, handlers.indexingItems)
|
||||||
|
|
||||||
return handlers, nil
|
return handlers, nil
|
||||||
}
|
}
|
||||||
|
@ -1171,6 +1172,29 @@ func (h *FileHandlers) SearchItems(c *gin.Context) {
|
||||||
c.JSON(200, &SearchItemsResp{Results: results})
|
c.JSON(200, &SearchItemsResp{Results: results})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *FileHandlers) Reindex(c *gin.Context) {
|
||||||
|
msg, err := json.Marshal(IndexingParams{})
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = h.deps.Workers().TryPut(
|
||||||
|
localworker.NewMsg(
|
||||||
|
h.deps.ID().Gen(),
|
||||||
|
map[string]string{localworker.MsgTypeKey: MsgTypeIndexing},
|
||||||
|
string(msg),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(q.ErrResp(c, 500, err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(q.Resp(200))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.ReadCloser, error) {
|
func (h *FileHandlers) GetStreamReader(userID uint64, fd io.Reader) (io.ReadCloser, error) {
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ func NewMultiUsersSvc(cfg gocfg.ICfg, deps *depidx.Deps) (*MultiUsersSvc, error)
|
||||||
apiRuleCname(db.AdminRole, "PATCH", "/v1/fs/files/copy"): true,
|
apiRuleCname(db.AdminRole, "PATCH", "/v1/fs/files/copy"): true,
|
||||||
apiRuleCname(db.AdminRole, "PATCH", "/v1/fs/files/move"): true,
|
apiRuleCname(db.AdminRole, "PATCH", "/v1/fs/files/move"): true,
|
||||||
apiRuleCname(db.AdminRole, "GET", "/v1/fs/search"): true,
|
apiRuleCname(db.AdminRole, "GET", "/v1/fs/search"): true,
|
||||||
|
apiRuleCname(db.AdminRole, "PUT", "/v1/fs/reindex"): true,
|
||||||
apiRuleCname(db.AdminRole, "GET", "/v1/fs/dirs"): true,
|
apiRuleCname(db.AdminRole, "GET", "/v1/fs/dirs"): true,
|
||||||
apiRuleCname(db.AdminRole, "GET", "/v1/fs/dirs/home"): true,
|
apiRuleCname(db.AdminRole, "GET", "/v1/fs/dirs/home"): true,
|
||||||
apiRuleCname(db.AdminRole, "POST", "/v1/fs/dirs"): true,
|
apiRuleCname(db.AdminRole, "POST", "/v1/fs/dirs"): true,
|
||||||
|
|
|
@ -20,21 +20,30 @@ type IFileIndex interface {
|
||||||
MovePath(pathname, dstParentPath string) error
|
MovePath(pathname, dstParentPath string) error
|
||||||
WriteTo(pathname string) error
|
WriteTo(pathname string) error
|
||||||
ReadFrom(pathname string) error
|
ReadFrom(pathname string) error
|
||||||
|
Reset() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type FileTreeIndex struct {
|
type FileTreeIndex struct {
|
||||||
fs fs.ISimpleFS
|
fs fs.ISimpleFS
|
||||||
index *fsearch.FSearch
|
index *fsearch.FSearch
|
||||||
|
pathSeparator string
|
||||||
|
maxResultSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFileTreeIndex(fs fs.ISimpleFS, pathSeparator string, maxResultSize int) *FileTreeIndex {
|
func NewFileTreeIndex(fs fs.ISimpleFS, pathSeparator string, maxResultSize int) *FileTreeIndex {
|
||||||
return &FileTreeIndex{
|
return &FileTreeIndex{
|
||||||
fs: fs,
|
fs: fs,
|
||||||
// TODO: support max result size config
|
index: fsearch.New(pathSeparator, maxResultSize),
|
||||||
index: fsearch.New(pathSeparator, maxResultSize),
|
pathSeparator: pathSeparator,
|
||||||
|
maxResultSize: maxResultSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (idx *FileTreeIndex) Reset() error {
|
||||||
|
idx.index = fsearch.New(idx.pathSeparator, idx.maxResultSize)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (idx *FileTreeIndex) Search(keyword string) ([]string, error) {
|
func (idx *FileTreeIndex) Search(keyword string) ([]string, error) {
|
||||||
return idx.index.Search(keyword)
|
return idx.index.Search(keyword)
|
||||||
}
|
}
|
||||||
|
|
|
@ -328,6 +328,7 @@ func initHandlers(router *gin.Engine, cfg gocfg.ICfg, deps *depidx.Deps) (*gin.E
|
||||||
|
|
||||||
filesAPI.GET("/metadata", fileHdrs.Metadata)
|
filesAPI.GET("/metadata", fileHdrs.Metadata)
|
||||||
filesAPI.GET("/search", fileHdrs.SearchItems)
|
filesAPI.GET("/search", fileHdrs.SearchItems)
|
||||||
|
filesAPI.PUT("/reindex", fileHdrs.Reindex)
|
||||||
|
|
||||||
filesAPI.POST("/hashes/sha1", fileHdrs.GenerateHash)
|
filesAPI.POST("/hashes/sha1", fileHdrs.GenerateHash)
|
||||||
|
|
||||||
|
|
|
@ -850,6 +850,24 @@ func TestFileHandlers(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
compareSearchResults := func(expectedPaths map[string]bool, searchItemsResp []string) bool {
|
||||||
|
if len(expectedPaths) != len(searchItemsResp) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
results := map[string]bool{}
|
||||||
|
for _, result := range searchItemsResp {
|
||||||
|
results[result] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for got := range expectedPaths {
|
||||||
|
if !results[got] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
t.Run("Search", func(t *testing.T) {
|
t.Run("Search", func(t *testing.T) {
|
||||||
files := map[string]string{
|
files := map[string]string{
|
||||||
"qs/files/search/keyword": "12345678",
|
"qs/files/search/keyword": "12345678",
|
||||||
|
@ -889,24 +907,7 @@ func TestFileHandlers(t *testing.T) {
|
||||||
t.Fatal(resp.StatusCode)
|
t.Fatal(resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
isMatch := func(expectedPaths map[string]bool, searchItemsResp []string) bool {
|
if !compareSearchResults(expected, searchItemsResp.Results) {
|
||||||
if len(expectedPaths) != len(searchItemsResp) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
results := map[string]bool{}
|
|
||||||
for _, result := range searchItemsResp {
|
|
||||||
results[result] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
for got := range expectedPaths {
|
|
||||||
if !results[got] {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if !isMatch(expected, searchItemsResp.Results) {
|
|
||||||
fmt.Printf("expected(%+v) got(%+v)", expected, searchItemsResp.Results)
|
fmt.Printf("expected(%+v) got(%+v)", expected, searchItemsResp.Results)
|
||||||
t.Fatal("search result not match")
|
t.Fatal("search result not match")
|
||||||
}
|
}
|
||||||
|
@ -926,7 +927,7 @@ func TestFileHandlers(t *testing.T) {
|
||||||
} else if resp.StatusCode != 200 {
|
} else if resp.StatusCode != 200 {
|
||||||
t.Fatal(resp.StatusCode)
|
t.Fatal(resp.StatusCode)
|
||||||
}
|
}
|
||||||
if !isMatch(afterDeleted, searchItemsResp.Results) {
|
if !compareSearchResults(afterDeleted, searchItemsResp.Results) {
|
||||||
fmt.Printf("expected(%+v) got(%+v)", afterDeleted, searchItemsResp.Results)
|
fmt.Printf("expected(%+v) got(%+v)", afterDeleted, searchItemsResp.Results)
|
||||||
t.Fatal("search result not match")
|
t.Fatal("search result not match")
|
||||||
}
|
}
|
||||||
|
@ -954,12 +955,71 @@ func TestFileHandlers(t *testing.T) {
|
||||||
} else if resp.StatusCode != 200 {
|
} else if resp.StatusCode != 200 {
|
||||||
t.Fatal(resp.StatusCode)
|
t.Fatal(resp.StatusCode)
|
||||||
}
|
}
|
||||||
if !isMatch(afterMoved, searchItemsResp.Results) {
|
if !compareSearchResults(afterMoved, searchItemsResp.Results) {
|
||||||
fmt.Printf("expected(%+v) got(%+v)", afterMoved, searchItemsResp.Results)
|
fmt.Printf("expected(%+v) got(%+v)", afterMoved, searchItemsResp.Results)
|
||||||
t.Fatal("search result not match")
|
t.Fatal("search result not match")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("Reindexing", func(t *testing.T) {
|
||||||
|
newFiles := map[string]bool{
|
||||||
|
"qs/files/reindexing/reindexkey": true,
|
||||||
|
"qs/files/reindexing/pathname/reindexkey": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
for newFilePath := range newFiles {
|
||||||
|
newFileDir := filepath.Dir(newFilePath)
|
||||||
|
err := fs.MkdirAll(newFileDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = fs.Create(newFilePath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fs.Sync()
|
||||||
|
|
||||||
|
resp, _, errs := cl.Reindex()
|
||||||
|
if len(errs) > 0 {
|
||||||
|
t.Fatal(errs)
|
||||||
|
} else if resp.StatusCode != 200 {
|
||||||
|
t.Fatal(resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
settingsCl := client.NewSettingsClient(addr)
|
||||||
|
for {
|
||||||
|
reportResp, wqResp, errs := settingsCl.WorkerQueueLen(token)
|
||||||
|
if len(errs) > 0 {
|
||||||
|
t.Fatal(errs)
|
||||||
|
} else if reportResp.StatusCode != 200 {
|
||||||
|
t.Fatal(reportResp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
if wqResp.QueueLen == 0 {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
fmt.Printf("%d messages left\n", wqResp.QueueLen)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: currently async worker status is not visible
|
||||||
|
// still need to wait for worker finishing indexing...
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
resp, searchItemsResp, errs := cl.SearchItems("reindexkey")
|
||||||
|
if len(errs) > 0 {
|
||||||
|
t.Fatal(errs)
|
||||||
|
} else if resp.StatusCode != 200 {
|
||||||
|
t.Fatal(resp.StatusCode)
|
||||||
|
}
|
||||||
|
if !compareSearchResults(newFiles, searchItemsResp.Results) {
|
||||||
|
fmt.Printf("expected(%+v) got(%+v)", newFiles, searchItemsResp.Results)
|
||||||
|
t.Fatal("search result not match")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
resp, _, errs = usersCl.Logout(token)
|
resp, _, errs = usersCl.Logout(token)
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
t.Fatal(errs)
|
t.Fatal(errs)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue