From 021e5090be4c4e28c60ae8fb7cf42ee366b68f07 Mon Sep 17 00:00:00 2001 From: Hexxa Date: Wed, 4 Aug 2021 22:00:51 -0500 Subject: [PATCH] feat(): refactor uploader (#68) * chore(src): delete unused codes * fix(client/worker): refactor uploading part and fix issues * fix(ui/worker): rename fg worker file name * fix(ui/worker): cleanups * feat(ui/uploader): switch from file_uploader to chunk_uploader with tests * fix(ui/worker): clean up code --- .../web/src/components/browser.updater.ts | 7 + src/client/web/src/components/core_state.ts | 2 +- src/client/web/src/components/state_mgr.tsx | 4 +- .../src/worker/__test__/upload.worker.test.ts | 90 +----- .../src/worker/__test__/upload_mgr.test.ts | 300 +++++++++++++----- src/client/web/src/worker/chunk_uploader.ts | 163 ++++++++++ src/client/web/src/worker/interface.ts | 35 +- .../web/src/worker/upload.baseworker.ts | 101 +++--- ...upload.fgworker.ts => upload.fg.worker.ts} | 0 src/client/web/src/worker/upload_mgr.ts | 122 +++++-- src/client/web/src/worker/uploader.ts | 2 +- src/handlers/fileshdr/handlers.go | 9 - src/handlers/util.go | 12 - 13 files changed, 580 insertions(+), 267 deletions(-) create mode 100644 src/client/web/src/worker/chunk_uploader.ts rename src/client/web/src/worker/{upload.fgworker.ts => upload.fg.worker.ts} (100%) diff --git a/src/client/web/src/components/browser.updater.ts b/src/client/web/src/components/browser.updater.ts index 7014fb3..3cde045 100644 --- a/src/client/web/src/components/browser.updater.ts +++ b/src/client/web/src/components/browser.updater.ts @@ -24,6 +24,13 @@ export class Updater { this.filesClient = filesClient; } + initUploads = () => { + this.props.uploadings.forEach(entry => { + Up().addStopped(entry.realFilePath, entry.uploaded, entry.size); + }) + // this.setUploadings(Up().list()); + }; + addUploads = (fileList: List) => { fileList.forEach(file => { const filePath = getItemPath( diff --git a/src/client/web/src/components/core_state.ts b/src/client/web/src/components/core_state.ts index 6917107..eae5411 100644 --- a/src/client/web/src/components/core_state.ts +++ b/src/client/web/src/components/core_state.ts @@ -1,7 +1,7 @@ import { List, Set, Map } from "immutable"; import BgWorker from "../worker/upload.bg.worker"; -import { FgWorker } from "../worker/upload.fgworker"; +import { FgWorker } from "../worker/upload.fg.worker"; import { Props as PanelProps } from "./root_frame"; import { Item } from "./browser"; diff --git a/src/client/web/src/components/state_mgr.tsx b/src/client/web/src/components/state_mgr.tsx index fb913bc..55eaea5 100644 --- a/src/client/web/src/components/state_mgr.tsx +++ b/src/client/web/src/components/state_mgr.tsx @@ -26,6 +26,9 @@ export class StateMgr extends React.Component { return BrowserUpdater().refreshUploadings(); }) .then((_: boolean) => { + BrowserUpdater().initUploads(); + }) + .then(() => { this.update(BrowserUpdater().setBrowser); }) .then(() => { @@ -38,7 +41,6 @@ export class StateMgr extends React.Component { return PanesUpdater.listUsers(); }) .then((_: boolean) => { - console.log(PanesUpdater); this.update(PanesUpdater.updateState); }); }; diff --git a/src/client/web/src/worker/__test__/upload.worker.test.ts b/src/client/web/src/worker/__test__/upload.worker.test.ts index b1b2d39..cd4d011 100644 --- a/src/client/web/src/worker/__test__/upload.worker.test.ts +++ b/src/client/web/src/worker/__test__/upload.worker.test.ts @@ -2,7 +2,7 @@ import { mock, instance, when } from "ts-mockito"; import { UploadWorker } from "../upload.baseworker"; import { FileUploader } from "../uploader"; -import { FileWorkerResp, UploadEntry, syncReqKind } from "../interface"; +import { FileWorkerResp, UploadEntry, syncReqKind, UploadState } from "../interface"; describe("upload.worker", () => { const content = ["123456"]; @@ -11,98 +11,16 @@ describe("upload.worker", () => { const fileSize = blob.size; const file = new File(content, filePath); - const makeEntry = (filePath: string, runnable: boolean): UploadEntry => { + const makeEntry = (filePath: string, state: UploadState): UploadEntry => { return { file: file, filePath, size: fileSize, uploaded: 0, - runnable, + state, err: "", }; }; - test("onMsg:syncReqKind: filter list and start uploading correct file", async () => { - const mockUploaderClass = mock(FileUploader); - when(mockUploaderClass.start()).thenCall( - (): Promise => { - return new Promise((resolve) => resolve(true)); - } - ); - when(mockUploaderClass.stop()).thenCall(() => {}); - - interface TestCase { - desc: string; - infos: Array; - expectedUploadingFile: string; - expectedUploaderStartInput: string; - currentFilePath: string; - } - - const tcs: Array = [ - { - desc: "add new uploadings when worker is in idle", - infos: [makeEntry("file1", true), makeEntry("file2", true)], - currentFilePath: "", - expectedUploadingFile: "file1", - expectedUploaderStartInput: "file1", - }, - { - desc: "add new uploadings when worker is in idle and skip some stopped files", - infos: [makeEntry("file1", false), makeEntry("file2", true)], - currentFilePath: "", - expectedUploadingFile: "file2", - expectedUploaderStartInput: "file2", - }, - { - desc: "current file should be stopped and start new uploading", - infos: [makeEntry("file0", false), makeEntry("file1", true)], - currentFilePath: "file0", - expectedUploadingFile: "file1", - expectedUploaderStartInput: "file1", - }, - { - desc: "uploader should keep uploading if the first uploadable file is not changed", - infos: [makeEntry("file1", true)], - expectedUploadingFile: "file1", - expectedUploaderStartInput: undefined, - currentFilePath: "file1", - }, - ]; - - for (let i = 0; i < tcs.length; i++) { - const uploadWorker = new UploadWorker(); - let currentUploader: FileUploader = undefined; - let uploaderFile: File = undefined; - let uploaderFilePath: string = undefined; - let uploaderStopFilePath: string = undefined; - uploadWorker.sendEvent = (_: FileWorkerResp) => {}; - uploadWorker.makeUploader = ( - file: File, - filePath: string - ): FileUploader => { - uploaderFile = file; - uploaderFilePath = filePath; - currentUploader = instance(mockUploaderClass); - return currentUploader; - }; - if (tcs[i].currentFilePath !== "") { - uploadWorker.setFilePath(tcs[i].currentFilePath); - } - const req = { - kind: syncReqKind, - infos: tcs[i].infos, - }; - - uploadWorker.onMsg( - new MessageEvent("worker", { - data: req, - }) - ); - - console.log(tcs[i].desc); - expect(uploadWorker.getFilePath()).toEqual(tcs[i].expectedUploadingFile); - expect(uploaderFilePath).toEqual(tcs[i].expectedUploaderStartInput); - } - }); + xtest("onMsg:syncReqKind: filter list and start uploading correct file", async () => {}); }); diff --git a/src/client/web/src/worker/__test__/upload_mgr.test.ts b/src/client/web/src/worker/__test__/upload_mgr.test.ts index 37a5c6e..0713c24 100644 --- a/src/client/web/src/worker/__test__/upload_mgr.test.ts +++ b/src/client/web/src/worker/__test__/upload_mgr.test.ts @@ -1,10 +1,12 @@ import { Map } from "immutable"; -import { mock, instance, when, anything } from "ts-mockito"; -import { FilesClient } from "../../client/files_mock"; -import { makePromise } from "../../test/helpers"; +import { FgWorker } from "../upload.fg.worker"; import { Up, initUploadMgr } from "../upload_mgr"; +import { + UploadState, + UploadStatus, +} from "../interface"; import { FileWorkerReq, @@ -29,88 +31,121 @@ const delay = (ms: number): Promise => { }; describe("UploadMgr", () => { - const content = ["123456"]; - const filePath = "mock/file"; - const blob = new Blob(content); - const fileSize = blob.size; - const file = new File(content, filePath); - const makeInfo = (filePath: string, runnable: boolean): UploadEntry => { - return { - file: file, - filePath: filePath, - size: fileSize, - uploaded: 0, - runnable, - err: "", - }; + const newFile = (filePath: string, content: string): File => { + const contentArray = [content]; + const blob = new Blob(contentArray); + return new File(contentArray, filePath); }; - test("test init and respHandler: pick up tasks and remove them after done", async () => { + test("test syncing: pick up item which is ready", async () => { interface TestCase { inputInfos: Array; - expectedInfos: Array; + expectedInfo: UploadEntry; } class MockWorker { constructor() {} + public expectedEntry: UploadEntry; onmessage = (event: MessageEvent): void => {}; postMessage = (req: FileWorkerReq): void => { - switch (req.kind) { - case syncReqKind: - const syncReq = req as SyncReq; - // find the first qualified task - const infoArray = syncReq.infos; - for (let i = 0; i < infoArray.length; i++) { - if ( - infoArray[i].runnable && - infoArray[i].uploaded < infoArray[i].size - ) { - this.onmessage( - new MessageEvent("worker", { - data: { - kind: uploadInfoKind, - filePath: infoArray[i].filePath, - uploaded: infoArray[i].size, - runnable: true, - err: "", - }, - }) - ); - break; - } - } - break; - default: - throw Error( - `unknown worker request ${req.kind} ${req.kind === syncReqKind}` - ); - } + expect(req.filePath).toEqual(this.expectedEntry.filePath); + expect(req.uploaded).toEqual(this.expectedEntry.uploaded); + expect(req.size).toEqual(this.expectedEntry.size); }; } const tcs: Array = [ { inputInfos: [ - makeInfo("path1/file1", true), - makeInfo("path2/file1", true), + { + file: undefined, + filePath: "t1/file1", + state: UploadState.Stopped, + uploaded: 3, + size: 2, + err: "", + }, + { + file: undefined, + filePath: "t1/file2", + state: UploadState.Uploading, + uploaded: 6, + size: 3, + err: "", + }, + { + file: undefined, + filePath: "t1/file3", + state: UploadState.Ready, + uploaded: 6, + size: 3, + err: "", + }, ], - expectedInfos: [], + expectedInfo: { + file: undefined, + filePath: "t1/file3", + state: UploadState.Ready, + uploaded: 6, + size: 3, + err: "", + }, }, { inputInfos: [ - makeInfo("path1/file1", true), - makeInfo("path2/file1", false), + { + file: undefined, + filePath: "path3/file1", + state: UploadState.Ready, + uploaded: 6, + size: 3, + err: "", + }, + { + file: undefined, + filePath: "path2/file1", + state: UploadState.Stopped, + uploaded: 6, + size: 3, + err: "", + }, ], - expectedInfos: [makeInfo("path2/file1", false)], + expectedInfo: { + file: undefined, + filePath: "path3/file1", + state: UploadState.Ready, + uploaded: 6, + size: 3, + err: "", + }, }, { inputInfos: [ - makeInfo("path1/file1", false), - makeInfo("path2/file1", true), - ], - expectedInfos: [ - makeInfo("path1/file1", false), + { + file: undefined, + filePath: "path3/file1", + state: UploadState.Created, + uploaded: 6, + size: 3, + err: "", + }, + { + file: undefined, + filePath: "path2/file1", + state: UploadState.Stopped, + uploaded: 6, + size: 3, + err: "", + }, ], + expectedInfo: { + file: undefined, + filePath: "path3/file1", + state: UploadState.Created, + uploaded: 6, + size: 3, + err: "", + }, }, ]; @@ -118,23 +153,146 @@ describe("UploadMgr", () => { for (let i = 0; i < tcs.length; i++) { initUploadMgr(worker); const up = Up(); - up.setCycle(100); + up.setCycle(50); + + const infoMap = arraytoMap(tcs[i].inputInfos); + up._setInfos(infoMap); + worker.expectedEntry = tcs[i].expectedInfo; + + // TODO: find a better way to wait + // polling needs several rounds to finish all the tasks + await delay(tcs.length * up.getCycle() + 1000); + up.destory(); + } + }); + + test("test e2e: from syncing to respHandler", async () => { + interface TestCase { + inputInfos: Array; + expectedInfos: Array; + } + + class MockUploader { + constructor() {} + create = (filePath: string, file: File): Promise => { + return new Promise((resolve) => + resolve({ + filePath, + uploaded: 0, + state: UploadState.Created, + err: "", + }) + ); + }; + upload = ( + filePath: string, + file: File, + uploaded: number + ): Promise => { + return new Promise((resolve) => + resolve({ + filePath, + uploaded: file.size, + state: UploadState.Ready, + err: "", + }) + ); + }; + } + + const tcs: Array = [ + { + inputInfos: [ + { + file: newFile("t1/file1", "123"), + filePath: "t1/file1", + state: UploadState.Ready, + uploaded: 0, + size: 3, + err: "", + }, + { + file: newFile("t1/file2", "123"), + filePath: "t1/file2", + state: UploadState.Ready, + uploaded: 0, + size: 3, + err: "", + }, + ], + expectedInfos: [], + }, + { + inputInfos: [ + { + file: newFile("t1/file1", "123"), + filePath: "t1/file1", + state: UploadState.Stopped, + uploaded: 0, + size: 3, + err: "", + }, + { + file: newFile("t1/file2", "123"), + filePath: "t1/file2", + state: UploadState.Error, + uploaded: 0, + size: 3, + err: "", + }, + { + file: newFile("t1/file3", "123"), + filePath: "t1/file3", + state: UploadState.Ready, + uploaded: 0, + size: 3, + err: "", + }, + ], + expectedInfos: [ + { + file: newFile("t1/file1", "123"), + filePath: "t1/file1", + state: UploadState.Stopped, + uploaded: 0, + size: 3, + err: "", + }, + { + file: newFile("t1/file2", "123"), + filePath: "t1/file2", + state: UploadState.Error, + uploaded: 0, + size: 3, + err: "", + }, + ], + }, + ]; + + for (let i = 0; i < tcs.length; i++) { + const uploader = new MockUploader(); + const worker = new FgWorker(); + worker.setUploader(uploader); + initUploadMgr(worker); + const up = Up(); + up.setCycle(1); const infoMap = arraytoMap(tcs[i].inputInfos); up._setInfos(infoMap); + // TODO: find a better way to wait, or this test is flanky // polling needs several rounds to finish all the tasks - await delay(tcs.length * up.getCycle() + 1000); - // TODO: find a better way to wait - const gotInfos = up.list(); + await delay(tcs[i].inputInfos.length * up.getCycle() * 2 + 5000); - const expectedInfoMap = arraytoMap(tcs[i].expectedInfos); - gotInfos.keySeq().forEach((filePath) => { - expect(gotInfos.get(filePath)).toEqual(expectedInfoMap.get(filePath)); - }); - expectedInfoMap.keySeq().forEach((filePath) => { - expect(expectedInfoMap.get(filePath)).toEqual(gotInfos.get(filePath)); - }); + const infos = up.list(); + expect(infos.size).toEqual(tcs[i].expectedInfos.length); + if (tcs[i].expectedInfos.length !== 0) { + tcs[i].expectedInfos.forEach((info) => { + const expectedInfo = infos.get(info.filePath); + expect(expectedInfo).toEqual(info); + }); + } up.destory(); } diff --git a/src/client/web/src/worker/chunk_uploader.ts b/src/client/web/src/worker/chunk_uploader.ts new file mode 100644 index 0000000..c40240f --- /dev/null +++ b/src/client/web/src/worker/chunk_uploader.ts @@ -0,0 +1,163 @@ +import { FilesClient } from "../client/files"; +import { IFilesClient, Response, isFatalErr } from "../client"; +import { UploadStatus, UploadState } from "./interface"; + +// TODO: get settings from server +// TODO: move chunk copying to worker +const defaultChunkLen = 1024 * 1024 * 1; +const speedDownRatio = 0.5; +const speedUpRatio = 1.05; +const createRetryLimit = 2; +const uploadRetryLimit = 1024; +const backoffMax = 2000; + +export interface ReaderResult { + chunk?: string; + err?: Error; +} + +export class ChunkUploader { + private reader = new FileReader(); + private client: IFilesClient = new FilesClient(""); + + private chunkLen: number = defaultChunkLen; + + constructor() {} + + setClient = (client: IFilesClient) => { + this.client = client; + }; + + backOff = async (): Promise => { + return new Promise((resolve) => { + const delay = Math.floor(Math.random() * backoffMax); + setTimeout(resolve, delay); + }); + }; + + create = async (filePath: string, file: File): Promise => { + let resp: Response; + + for (let i = 0; i < createRetryLimit; i++) { + try { + resp = await this.client.create(filePath, file.size); + if (resp.status === 200 || resp.status === 304) { + return { + filePath, + uploaded: 0, + state: UploadState.Created, + err: "", + }; + } + } catch (e) { + await this.backOff(); + console.error(e); + } + } + + return { + filePath, + uploaded: 0, + state: UploadState.Error, + err: `failed to create ${filePath}: status=${resp.statusText}`, + }; + }; + + upload = async ( + filePath: string, + file: File, + uploaded: number + ): Promise => { + if (this.chunkLen === 0) { + this.chunkLen = 1; // reset it to 1B + } else if (uploaded > file.size) { + return { + filePath, + uploaded, + state: UploadState.Error, + err: "uploaded is greater than file size", + }; + } + + const readerPromise = new Promise( + (resolve: (result: ReaderResult) => void) => { + this.reader.onerror = (_: ProgressEvent) => { + resolve({ err: this.reader.error }); + }; + + this.reader.onloadend = (ev: ProgressEvent) => { + const dataURL = ev.target.result as string; // readAsDataURL + const base64Chunk = dataURL.slice(dataURL.indexOf(",") + 1); + resolve({ chunk: base64Chunk }); + }; + } + ); + + const chunkRightPos = + uploaded + this.chunkLen > file.size + ? file.size + : uploaded + this.chunkLen; + const blob = file.slice(uploaded, chunkRightPos); + this.reader.readAsDataURL(blob); + + const result = await readerPromise; + if (result.err != null) { + return { + filePath, + uploaded, + state: UploadState.Error, + err: result.err.toString(), + }; + } + + try { + const uploadResp = await this.client.uploadChunk( + filePath, + result.chunk, + uploaded + ); + + if (uploadResp.status === 200 && uploadResp.data != null) { + this.chunkLen = Math.ceil(this.chunkLen * speedUpRatio); + return { + filePath, + uploaded: uploadResp.data.uploaded, + state: UploadState.Ready, + err: "", + }; + } else if (isFatalErr(uploadResp)) { + return { + filePath, + uploaded, + state: UploadState.Error, + err: `failed to upload chunk: ${uploadResp.statusText}`, + }; + } + + this.chunkLen = Math.ceil(this.chunkLen * speedDownRatio); + await this.backOff(); + + const uploadStatusResp = await this.client.uploadStatus(filePath); + return uploadStatusResp.status === 200 + ? { + filePath, + uploaded: uploadStatusResp.data.uploaded, + state: UploadState.Ready, + err: "", + } + : { + filePath, + uploaded: uploaded, + state: UploadState.Error, + err: `failed to get upload status: ${uploadStatusResp.statusText}`, + }; + } catch (e) { + return { + filePath, + uploaded: uploaded, + state: UploadState.Error, + err: `[chunk uploader]: ${e.toString()}`, + }; + } + }; +} diff --git a/src/client/web/src/worker/interface.ts b/src/client/web/src/worker/interface.ts index 1315919..9847920 100644 --- a/src/client/web/src/worker/interface.ts +++ b/src/client/web/src/worker/interface.ts @@ -1,12 +1,36 @@ +export const enum UploadState { + Created, + Ready, + Uploading, + Stopped, + Error, +} + +export interface UploadStatus { + filePath: string; + uploaded: number; + state: UploadState; + err: string; +} + export interface UploadEntry { file: File; filePath: string; size: number; uploaded: number; - runnable: boolean; + state: UploadState; err: string; } +export interface IChunkUploader { + create: (filePath: string, file: File) => Promise; + upload: ( + filePath: string, + file: File, + uploaded: number + ) => Promise; +} + export type eventKind = SyncReqKind | ErrKind | UploadInfoKind; export interface WorkerEvent { kind: eventKind; @@ -17,7 +41,11 @@ export const syncReqKind: SyncReqKind = "worker.req.sync"; export interface SyncReq extends WorkerEvent { kind: SyncReqKind; - infos: Array; + file: File, + filePath: string; + size: number; + uploaded: number; + created: boolean; } export type FileWorkerReq = SyncReq; @@ -26,6 +54,7 @@ export type ErrKind = "worker.resp.err"; export const errKind: ErrKind = "worker.resp.err"; export interface ErrResp extends WorkerEvent { kind: ErrKind; + filePath: string; err: string; } @@ -36,7 +65,7 @@ export interface UploadInfoResp extends WorkerEvent { kind: UploadInfoKind; filePath: string; uploaded: number; - runnable: boolean; + state: UploadState; err: string; } diff --git a/src/client/web/src/worker/upload.baseworker.ts b/src/client/web/src/worker/upload.baseworker.ts index 2d00c5b..f8ebd9a 100644 --- a/src/client/web/src/worker/upload.baseworker.ts +++ b/src/client/web/src/worker/upload.baseworker.ts @@ -1,4 +1,4 @@ -import { FileUploader } from "./uploader"; +import { ChunkUploader } from "./chunk_uploader"; import { FileWorkerReq, syncReqKind, @@ -8,92 +8,73 @@ import { uploadInfoKind, UploadInfoResp, FileWorkerResp, + UploadStatus, + UploadState, + IChunkUploader, } from "./interface"; export class UploadWorker { - private file: File = undefined; - private filePath: string = undefined; - private uploader: FileUploader = undefined; + private uploader: IChunkUploader = new ChunkUploader(); sendEvent = (resp: FileWorkerResp): void => { // TODO: make this abstract throw new Error("not implemented"); }; - makeUploader = (file: File, filePath: string): FileUploader => { - return new FileUploader(file, filePath, this.onCb); - }; - startUploader = (file: File, filePath: string) => { - this.file = file; - this.filePath = filePath; - this.uploader = this.makeUploader(file, filePath); - this.uploader.start(); - }; - stopUploader = () => { - if (this.uploader != null) { - this.uploader.stop(); - this.file = undefined; - this.filePath = undefined; - } - }; - getFilePath = (): string => { - return this.filePath; - }; - - setFilePath = (fp: string) => { - this.filePath = fp; - }; constructor() {} + setUploader = (uploader: IChunkUploader) => { + this.uploader = uploader; + }; + + handleUploadStatus = (status: UploadStatus) => { + if (status.state !== UploadState.Error) { + const resp: UploadInfoResp = { + kind: uploadInfoKind, + filePath: status.filePath, + uploaded: status.uploaded, + state: status.state, + err: "", + }; + this.sendEvent(resp); + } else { + const resp: ErrResp = { + kind: errKind, + filePath: status.filePath, + err: status.err, + }; + this.sendEvent(resp); + } + }; + onMsg = (event: MessageEvent) => { const req = event.data as FileWorkerReq; + switch (req.kind) { case syncReqKind: - // find the first qualified task const syncReq = req as SyncReq; - const infoArray = syncReq.infos; - for (let i = 0; i < infoArray.length; i++) { - if (infoArray[i].runnable) { - if (infoArray[i].filePath === this.filePath) { - // in uploading, do nothing - } else { - this.stopUploader(); - this.startUploader(infoArray[i].file, infoArray[i].filePath); - } - break; - } else { - if (infoArray[i].filePath === this.filePath) { - this.stopUploader(); - } - } + if (syncReq.created) { + this.uploader + .upload(syncReq.filePath, syncReq.file, syncReq.uploaded) + .then(this.handleUploadStatus); + } else { + this.uploader + .create(syncReq.filePath, syncReq.file) + .then(this.handleUploadStatus); } + break; default: - console.log(`unknown worker request(${JSON.stringify(req)})`); + console.error(`unknown worker request(${JSON.stringify(req)})`); } }; onError = (ev: ErrorEvent) => { const errResp: ErrResp = { kind: errKind, + filePath: "unknown", err: ev.error, }; this.sendEvent(errResp); }; - - onCb = ( - filePath: string, - uploaded: number, - runnable: boolean, - err: string - ): void => { - const uploadInfoResp: UploadInfoResp = { - kind: uploadInfoKind, - filePath, - uploaded, - runnable, - err, - }; - this.sendEvent(uploadInfoResp); - }; } diff --git a/src/client/web/src/worker/upload.fgworker.ts b/src/client/web/src/worker/upload.fg.worker.ts similarity index 100% rename from src/client/web/src/worker/upload.fgworker.ts rename to src/client/web/src/worker/upload.fg.worker.ts diff --git a/src/client/web/src/worker/upload_mgr.ts b/src/client/web/src/worker/upload_mgr.ts index d3005ca..7a54db2 100644 --- a/src/client/web/src/worker/upload_mgr.ts +++ b/src/client/web/src/worker/upload_mgr.ts @@ -9,8 +9,8 @@ import { syncReqKind, errKind, uploadInfoKind, + UploadState, } from "./interface"; -import { FgWorker } from "./upload.fgworker"; const win: Window = self as any; @@ -20,22 +20,54 @@ export interface IWorker { } export class UploadMgr { - private infos = OrderedMap(); - private worker: IWorker; - private intervalID: number; + private idx = 0; private cycle: number = 500; + private intervalID: number; + private worker: IWorker; + private infos = OrderedMap(); private statusCb = (infos: Map): void => {}; constructor(worker: IWorker) { this.worker = worker; - // TODO: fallback to normal if Web Worker is not available this.worker.onmessage = this.respHandler; const syncing = () => { - this.worker.postMessage({ - kind: syncReqKind, - infos: this.infos.valueSeq().toArray(), - }); + if (this.infos.size === 0) { + return; + } + if (this.idx > 10000) { + this.idx = 0; + } + + const start = this.idx % this.infos.size; + const infos = this.infos.valueSeq().toArray(); + for (let i = 0; i < this.infos.size; i++) { + const pos = (start + i) % this.infos.size; + const info = infos[pos]; + + if ( + info.state === UploadState.Ready || + info.state === UploadState.Created + ) { + + this.infos = this.infos.set(info.filePath, { + ...info, + state: UploadState.Uploading, + }); + + this.worker.postMessage({ + kind: syncReqKind, + file: info.file, + filePath: info.filePath, + size: info.size, + uploaded: info.uploaded, + created: info.uploaded > 0 || info.state === UploadState.Created, + }); + break; + } + } + + this.idx++; }; this.intervalID = win.setInterval(syncing, this.cycle); } @@ -60,6 +92,19 @@ export class UploadMgr { this.statusCb = cb; }; + // addStopped is for initializing uploading list in the UploadMgr + // notice even uploading list are shown in the UI, it may not inited in the UploadMgr + addStopped = (filePath: string, uploaded: number, fileSize: number) => { + this.infos = this.infos.set(filePath, { + file: new File([""], filePath), // create a dumb file + filePath, + size: fileSize, + uploaded, + state: UploadState.Stopped, + err: "", + }); + }; + add = (file: File, filePath: string) => { const entry = this.infos.get(filePath); if (entry == null) { @@ -69,33 +114,52 @@ export class UploadMgr { filePath: filePath, size: file.size, uploaded: 0, - runnable: true, + state: UploadState.Ready, err: "", }); } else { // restart the uploading - this.infos = this.infos.set(filePath, { - ...entry, - runnable: true, - }); + if ( + entry.state === UploadState.Stopped && + filePath === entry.filePath && + file.size === entry.size + ) { + // try to upload a file with same name but actually with different content. + // it still can not resolve one case: names and sizes are same, but contents are different + // TODO: showing file SHA will avoid above case + this.infos = this.infos.set(filePath, { + ...entry, + file: file, + state: UploadState.Ready, + }); + } else { + alert( + `(${filePath}) seems not same file with uploading item, please check.` + ); + } } + this.statusCb(this.infos.toMap()); }; stop = (filePath: string) => { const entry = this.infos.get(filePath); if (entry != null) { + this.infos = this.infos.set(filePath, { ...entry, - runnable: false, + state: UploadState.Stopped, }); + } else { alert(`failed to stop uploading ${filePath}: not found`); } + this.statusCb(this.infos.toMap()); }; delete = (filePath: string) => { this.stop(filePath); this.infos = this.infos.delete(filePath); + this.statusCb(this.infos.toMap()); }; list = (): OrderedMap => { @@ -107,9 +171,20 @@ export class UploadMgr { switch (resp.kind) { case errKind: - // TODO: refine this const errResp = resp as ErrResp; - console.error(`respHandler: ${errResp}`); + const errEntry = this.infos.get(errResp.filePath); + + if (errEntry != null) { + this.infos = this.infos.set(errResp.filePath, { + ...errEntry, + state: UploadState.Error, + err: `${errEntry.err} / ${errResp.filePath}`, + }); + } else { + // TODO: refine this + console.error(`uploading ${errResp.filePath} may already be deleted`); + } + break; case uploadInfoKind: const infoResp = resp as UploadInfoResp; @@ -122,17 +197,17 @@ export class UploadMgr { this.infos = this.infos.set(infoResp.filePath, { ...entry, uploaded: infoResp.uploaded, - runnable: infoResp.runnable, - err: infoResp.err, + state: + // this avoids overwriting Stopped/Error state + (entry.state === UploadState.Stopped || entry.state === UploadState.Error) + ? UploadState.Stopped + : infoResp.state, }); } - - // call back to update the info - this.statusCb(this.infos.toMap()); } else { // TODO: refine this console.error( - `respHandler: fail to found: file(${ + `respHandler: may already be deleted: file(${ infoResp.filePath }) infos(${this.infos.toObject()})` ); @@ -141,6 +216,7 @@ export class UploadMgr { default: console.error(`respHandler: response kind not found: ${resp}`); } + this.statusCb(this.infos.toMap()); }; } diff --git a/src/client/web/src/worker/uploader.ts b/src/client/web/src/worker/uploader.ts index e113703..fa875a3 100644 --- a/src/client/web/src/worker/uploader.ts +++ b/src/client/web/src/worker/uploader.ts @@ -175,7 +175,7 @@ export class FileUploader { if (this.chunkLen === 0) { this.errMsgs.push( - "the network condition may be poor, please retry later." + "the network condition is poor or server is busy, please retry later." ); } else if (!this.isOn) { this.errMsgs.push("uploading is stopped"); diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 6044183..d6bad36 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -136,7 +136,6 @@ func (h *FileHandlers) Create(c *gin.Context) { return } - // fileDir := q.FsPath(userID, filepath.Dir(req.Path)) err = h.deps.FS().MkdirAll(filepath.Dir(req.Path)) if err != nil { c.JSON(q.ErrResp(c, 500, err)) @@ -160,7 +159,6 @@ func (h *FileHandlers) Delete(c *gin.Context) { return } - // filePath = q.FsPath(userID, filePath) err := h.deps.FS().Remove(filePath) if err != nil { c.JSON(q.ErrResp(c, 500, err)) @@ -190,7 +188,6 @@ func (h *FileHandlers) Metadata(c *gin.Context) { return } - // filePath = q.FsPath(userID, filePath) info, err := h.deps.FS().Stat(filePath) if err != nil { c.JSON(q.ErrResp(c, 500, err)) @@ -222,7 +219,6 @@ func (h *FileHandlers) Mkdir(c *gin.Context) { return } - // dirPath := q.FsPath(userID, req.Path) err := h.deps.FS().MkdirAll(req.Path) if err != nil { c.JSON(q.ErrResp(c, 500, err)) @@ -250,8 +246,6 @@ func (h *FileHandlers) Move(c *gin.Context) { return } - // oldPath := q.FsPath(userID, req.OldPath) - // newPath := q.FsPath(userID, req.NewPath) _, err := h.deps.FS().Stat(req.OldPath) if err != nil { c.JSON(q.ErrResp(c, 500, err)) @@ -357,7 +351,6 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) { } func (h *FileHandlers) getFSFilePath(userID, fsFilePath string) (string, error) { - // fsFilePath := q.FsPath(userID, reqPath) _, err := h.deps.FS().Stat(fsFilePath) if err != nil { if os.IsNotExist(err) { @@ -447,7 +440,6 @@ func (h *FileHandlers) Download(c *gin.Context) { // TODO: when sharing is introduced, move following logics to a separeted method // concurrently file accessing is managed by os - // filePath = q.FsPath(userID, filePath) info, err := h.deps.FS().Stat(filePath) if err != nil { if os.IsNotExist(err) { @@ -525,7 +517,6 @@ func (h *FileHandlers) List(c *gin.Context) { return } - // dirPath = q.FsPath(userID, dirPath) infos, err := h.deps.FS().ListDir(dirPath) if err != nil { c.JSON(q.ErrResp(c, 500, err)) diff --git a/src/handlers/util.go b/src/handlers/util.go index aedffcd..726b1d3 100644 --- a/src/handlers/util.go +++ b/src/handlers/util.go @@ -125,22 +125,10 @@ func ErrResp(c *gin.Context, code int, err error) (int, interface{}) { } -func FsPath(userID, relFilePath string) string { - return filepath.Join(userID, FsDir, relFilePath) -} - -func HomePath(userID, relFilePath string) string { - return filepath.Join(userID, relFilePath) -} - func FsRootPath(userID, relFilePath string) string { return filepath.Join(userID, FsRootDir, relFilePath) } -func GetTmpPath(userID, relFilePath string) string { - return filepath.Join(UploadDir, userID, fmt.Sprintf("%x", sha1.Sum([]byte(relFilePath)))) -} - func UploadPath(userID, relFilePath string) string { return filepath.Join(UploadFolder(userID), fmt.Sprintf("%x", sha1.Sum([]byte(relFilePath)))) }