From ff5e6db1406b0ba37b42a8d803e4ee9a71b076d6 Mon Sep 17 00:00:00 2001 From: hexxa Date: Wed, 5 Jan 2022 14:31:37 +0800 Subject: [PATCH] fix(fe/worker): refactor fe uploading functions to improve robustness --- src/client/web/src/i18n/en_US.ts | 2 +- src/client/web/src/worker/chunk_uploader.ts | 32 ++++---- src/client/web/src/worker/interface.ts | 14 +++- .../web/src/worker/upload.baseworker.ts | 45 ++++++++--- src/client/web/src/worker/upload_mgr.ts | 81 +++++++++---------- src/handlers/fileshdr/handlers.go | 18 ++++- 6 files changed, 118 insertions(+), 74 deletions(-) diff --git a/src/client/web/src/i18n/en_US.ts b/src/client/web/src/i18n/en_US.ts index 07bd7c0..d980e8f 100644 --- a/src/client/web/src/i18n/en_US.ts +++ b/src/client/web/src/i18n/en_US.ts @@ -114,7 +114,7 @@ export const msgs: Map = Map({ "upload.add.fail": "Some files conflict with uploading files, please check.", "server.fail": "The operation failed in the server", "err.updater": "updater error", - "err.uploadMgr": "upload Manager error", + "err.uploadMgr": "Upload Manager error", "err.server": "The operation failed in the server", "err.script.cors": "script error with CORS", "err.unknown": "unknown error", diff --git a/src/client/web/src/worker/chunk_uploader.ts b/src/client/web/src/worker/chunk_uploader.ts index 3e30480..41c2355 100644 --- a/src/client/web/src/worker/chunk_uploader.ts +++ b/src/client/web/src/worker/chunk_uploader.ts @@ -7,7 +7,7 @@ import { UploadStatus, UploadState } from "./interface"; const defaultChunkLen = 1024 * 1024 * 1; const speedDownRatio = 0.5; const speedUpRatio = 1.1; -const speedLimit = 1024 * 1024 * 10; // 10MB +const chunkLimit = 1024 * 1024 * 50; // 50MB const createRetryLimit = 512; const uploadRetryLimit = 1024; const backoffMax = 2000; @@ -22,7 +22,7 @@ export class ChunkUploader { private chunkLen: number = defaultChunkLen; - constructor() { } + constructor() {} setClient = (client: IFilesClient) => { this.client = client; @@ -70,8 +70,8 @@ export class ChunkUploader { ): Promise => { if (this.chunkLen === 0) { this.chunkLen = 1; // reset it to 1B - } else if (this.chunkLen > speedLimit) { - this.chunkLen = speedLimit; + } else if (this.chunkLen > chunkLimit) { + this.chunkLen = chunkLimit; } else if (uploaded > file.size) { return { filePath, @@ -143,7 +143,9 @@ export class ChunkUploader { filePath, uploaded, state: UploadState.Error, - err: `failed to upload chunk: ${uploadResp.statusText}, ${JSON.stringify(uploadResp.data)}`, + err: `failed to upload chunk: ${ + uploadResp.statusText + }, ${JSON.stringify(uploadResp.data)}`, }; } @@ -153,17 +155,17 @@ export class ChunkUploader { const uploadStatusResp = await this.client.uploadStatus(filePath); return uploadStatusResp.status === 200 ? { - filePath, - uploaded: uploadStatusResp.data.uploaded, - state: UploadState.Ready, - err: `retrying, error: ${JSON.stringify(uploadResp.data)}`, - } + filePath, + uploaded: uploadStatusResp.data.uploaded, + state: UploadState.Ready, + err: `retrying, error: ${JSON.stringify(uploadResp.data)}`, + } : { - filePath, - uploaded: uploaded, - state: UploadState.Error, - err: `failed to get upload status: ${uploadStatusResp.statusText}`, - }; + filePath, + uploaded: uploaded, + state: UploadState.Error, + err: `failed to get upload status: ${uploadStatusResp.statusText}`, + }; } catch (e) { return { filePath, diff --git a/src/client/web/src/worker/interface.ts b/src/client/web/src/worker/interface.ts index 9847920..238f597 100644 --- a/src/client/web/src/worker/interface.ts +++ b/src/client/web/src/worker/interface.ts @@ -31,7 +31,7 @@ export interface IChunkUploader { ) => Promise; } -export type eventKind = SyncReqKind | ErrKind | UploadInfoKind; +export type eventKind = SyncReqKind | ErrKind | UploadInfoKind | ImIdleKind; export interface WorkerEvent { kind: eventKind; } @@ -41,7 +41,7 @@ export const syncReqKind: SyncReqKind = "worker.req.sync"; export interface SyncReq extends WorkerEvent { kind: SyncReqKind; - file: File, + file: File; filePath: string; size: number; uploaded: number; @@ -69,10 +69,16 @@ export interface UploadInfoResp extends WorkerEvent { err: string; } -export type FileWorkerResp = ErrResp | UploadInfoResp; +export type ImIdleKind = "worker.resp.idle"; +export const imIdleKind: ImIdleKind = "worker.resp.idle"; +export interface ImIdleResp extends WorkerEvent { + kind: ImIdleKind; +} + +export type FileWorkerResp = ErrResp | UploadInfoResp | ImIdleResp; export class MockWorker { constructor() {} onmessage = (event: MessageEvent): void => {}; postMessage = (event: FileWorkerReq): void => {}; -} \ No newline at end of file +} diff --git a/src/client/web/src/worker/upload.baseworker.ts b/src/client/web/src/worker/upload.baseworker.ts index f8ebd9a..52970cc 100644 --- a/src/client/web/src/worker/upload.baseworker.ts +++ b/src/client/web/src/worker/upload.baseworker.ts @@ -5,7 +5,9 @@ import { SyncReq, errKind, ErrResp, + ImIdleResp, uploadInfoKind, + imIdleKind, UploadInfoResp, FileWorkerResp, UploadStatus, @@ -13,14 +15,32 @@ import { IChunkUploader, } from "./interface"; +const win: Window = self as any; + export class UploadWorker { private uploader: IChunkUploader = new ChunkUploader(); + private cycle: number = 100; + private working: boolean = false; + sendEvent = (resp: FileWorkerResp): void => { // TODO: make this abstract throw new Error("not implemented"); }; - constructor() {} + constructor() { + win.setInterval(this.checkIdle, this.cycle); + } + + checkIdle = () => { + if (this.working) { + return; + } + + const resp: ImIdleResp = { + kind: imIdleKind, + }; + this.sendEvent(resp); + }; setUploader = (uploader: IChunkUploader) => { this.uploader = uploader; @@ -46,7 +66,8 @@ export class UploadWorker { } }; - onMsg = (event: MessageEvent) => { + onMsg = async (event: MessageEvent) => { + this.working = true; const req = event.data as FileWorkerReq; switch (req.kind) { @@ -54,19 +75,25 @@ export class UploadWorker { const syncReq = req as SyncReq; if (syncReq.created) { - this.uploader - .upload(syncReq.filePath, syncReq.file, syncReq.uploaded) - .then(this.handleUploadStatus); + const status = await this.uploader.upload( + syncReq.filePath, + syncReq.file, + syncReq.uploaded + ); + await this.handleUploadStatus(status); } else { - this.uploader - .create(syncReq.filePath, syncReq.file) - .then(this.handleUploadStatus); + const status = await this.uploader.create( + syncReq.filePath, + syncReq.file + ); + await this.handleUploadStatus(status); } - break; default: console.error(`unknown worker request(${JSON.stringify(req)})`); } + + this.working = false; }; onError = (ev: ErrorEvent) => { diff --git a/src/client/web/src/worker/upload_mgr.ts b/src/client/web/src/worker/upload_mgr.ts index 4e1b261..d173ed6 100644 --- a/src/client/web/src/worker/upload_mgr.ts +++ b/src/client/web/src/worker/upload_mgr.ts @@ -9,6 +9,7 @@ import { syncReqKind, errKind, uploadInfoKind, + imIdleKind, UploadState, } from "./interface"; import { errUploadMgr } from "../common/errors"; @@ -35,49 +36,44 @@ export class UploadMgr { constructor(worker: IWorker) { this.worker = worker; this.worker.onmessage = this.respHandler; - - const syncing = () => { - if (this.infos.size === 0) { - return; - } - if (this.idx > 10000) { - this.idx = 0; - } - - const start = this.idx % this.infos.size; - const infos = this.infos.valueSeq().toArray(); - for (let i = 0; i < this.infos.size; i++) { - const pos = (start + i) % this.infos.size; - const info = infos[pos]; - - if ( - info.state === UploadState.Ready || - info.state === UploadState.Created - ) { - this.infos = this.infos.set(info.filePath, { - ...info, - state: UploadState.Uploading, - }); - - this.worker.postMessage({ - kind: syncReqKind, - file: info.file, - filePath: info.filePath, - size: info.size, - uploaded: info.uploaded, - created: info.uploaded > 0 || info.state === UploadState.Created, - }); - break; - } - } - - this.idx++; - }; - this.intervalID = win.setInterval(syncing, this.cycle); } - destory = () => { - win.clearInterval(this.intervalID); + syncing = () => { + if (this.infos.size === 0) { + return; + } + if (this.idx > 10000) { + this.idx = 0; + } + + const start = this.idx % this.infos.size; + const infos = this.infos.valueSeq().toArray(); + for (let i = 0; i < this.infos.size; i++) { + const pos = (start + i) % this.infos.size; + const info = infos[pos]; + + if ( + info.state === UploadState.Ready || + info.state === UploadState.Created + ) { + this.infos = this.infos.set(info.filePath, { + ...info, + state: UploadState.Uploading, + }); + + this.worker.postMessage({ + kind: syncReqKind, + file: info.file, + filePath: info.filePath, + size: info.size, + uploaded: info.uploaded, + created: info.uploaded > 0 || info.state === UploadState.Created, + }); + break; + } + } + + this.idx++; }; _setInfos = (infos: OrderedMap) => { @@ -187,6 +183,9 @@ export class UploadMgr { const resp = event.data as FileWorkerResp; switch (resp.kind) { + case imIdleKind: + this.syncing(); + break; case errKind: const errResp = resp as ErrResp; const errEntry = this.infos.get(errResp.filePath); diff --git a/src/handlers/fileshdr/handlers.go b/src/handlers/fileshdr/handlers.go index 8e3c0d4..9964867 100644 --- a/src/handlers/fileshdr/handlers.go +++ b/src/handlers/fileshdr/handlers.go @@ -18,9 +18,9 @@ import ( "github.com/ihexxa/gocfg" "github.com/ihexxa/multipart" + "github.com/ihexxa/quickshare/src/db/userstore" "github.com/ihexxa/quickshare/src/depidx" q "github.com/ihexxa/quickshare/src/handlers" - "github.com/ihexxa/quickshare/src/db/userstore" "github.com/ihexxa/quickshare/src/worker/localworker" ) @@ -793,10 +793,20 @@ func (h *FileHandlers) DelUploading(c *gin.Context) { return } - err = h.deps.FS().Remove(tmpFilePath) + _, err = h.deps.FS().Stat(tmpFilePath) if err != nil { - c.JSON(q.ErrResp(c, 500, err)) - return + if os.IsNotExist(err) { + // no op + } else { + c.JSON(q.ErrResp(c, 500, err)) + return + } + } else { + err = h.deps.FS().Remove(tmpFilePath) + if err != nil { + c.JSON(q.ErrResp(c, 500, err)) + return + } } err = h.uploadMgr.DelInfo(userID, tmpFilePath)