fix(fe/worker): refactor fe uploading functions to improve robustness

This commit is contained in:
hexxa 2022-01-05 14:31:37 +08:00 committed by Hexxa
parent aca1ae6787
commit ff5e6db140
6 changed files with 118 additions and 74 deletions

View file

@ -114,7 +114,7 @@ export const msgs: Map<string, string> = Map({
"upload.add.fail": "Some files conflict with uploading files, please check.",
"server.fail": "The operation failed in the server",
"err.updater": "updater error",
"err.uploadMgr": "upload Manager error",
"err.uploadMgr": "Upload Manager error",
"err.server": "The operation failed in the server",
"err.script.cors": "script error with CORS",
"err.unknown": "unknown error",

View file

@ -7,7 +7,7 @@ import { UploadStatus, UploadState } from "./interface";
const defaultChunkLen = 1024 * 1024 * 1;
const speedDownRatio = 0.5;
const speedUpRatio = 1.1;
const speedLimit = 1024 * 1024 * 10; // 10MB
const chunkLimit = 1024 * 1024 * 50; // 50MB
const createRetryLimit = 512;
const uploadRetryLimit = 1024;
const backoffMax = 2000;
@ -22,7 +22,7 @@ export class ChunkUploader {
private chunkLen: number = defaultChunkLen;
constructor() { }
constructor() {}
setClient = (client: IFilesClient) => {
this.client = client;
@ -70,8 +70,8 @@ export class ChunkUploader {
): Promise<UploadStatus> => {
if (this.chunkLen === 0) {
this.chunkLen = 1; // reset it to 1B
} else if (this.chunkLen > speedLimit) {
this.chunkLen = speedLimit;
} else if (this.chunkLen > chunkLimit) {
this.chunkLen = chunkLimit;
} else if (uploaded > file.size) {
return {
filePath,
@ -143,7 +143,9 @@ export class ChunkUploader {
filePath,
uploaded,
state: UploadState.Error,
err: `failed to upload chunk: ${uploadResp.statusText}, ${JSON.stringify(uploadResp.data)}`,
err: `failed to upload chunk: ${
uploadResp.statusText
}, ${JSON.stringify(uploadResp.data)}`,
};
}
@ -153,17 +155,17 @@ export class ChunkUploader {
const uploadStatusResp = await this.client.uploadStatus(filePath);
return uploadStatusResp.status === 200
? {
filePath,
uploaded: uploadStatusResp.data.uploaded,
state: UploadState.Ready,
err: `retrying, error: ${JSON.stringify(uploadResp.data)}`,
}
filePath,
uploaded: uploadStatusResp.data.uploaded,
state: UploadState.Ready,
err: `retrying, error: ${JSON.stringify(uploadResp.data)}`,
}
: {
filePath,
uploaded: uploaded,
state: UploadState.Error,
err: `failed to get upload status: ${uploadStatusResp.statusText}`,
};
filePath,
uploaded: uploaded,
state: UploadState.Error,
err: `failed to get upload status: ${uploadStatusResp.statusText}`,
};
} catch (e) {
return {
filePath,

View file

@ -31,7 +31,7 @@ export interface IChunkUploader {
) => Promise<UploadStatus>;
}
export type eventKind = SyncReqKind | ErrKind | UploadInfoKind;
export type eventKind = SyncReqKind | ErrKind | UploadInfoKind | ImIdleKind;
export interface WorkerEvent {
kind: eventKind;
}
@ -41,7 +41,7 @@ export const syncReqKind: SyncReqKind = "worker.req.sync";
export interface SyncReq extends WorkerEvent {
kind: SyncReqKind;
file: File,
file: File;
filePath: string;
size: number;
uploaded: number;
@ -69,10 +69,16 @@ export interface UploadInfoResp extends WorkerEvent {
err: string;
}
export type FileWorkerResp = ErrResp | UploadInfoResp;
export type ImIdleKind = "worker.resp.idle";
export const imIdleKind: ImIdleKind = "worker.resp.idle";
export interface ImIdleResp extends WorkerEvent {
kind: ImIdleKind;
}
export type FileWorkerResp = ErrResp | UploadInfoResp | ImIdleResp;
export class MockWorker {
constructor() {}
onmessage = (event: MessageEvent): void => {};
postMessage = (event: FileWorkerReq): void => {};
}
}

View file

@ -5,7 +5,9 @@ import {
SyncReq,
errKind,
ErrResp,
ImIdleResp,
uploadInfoKind,
imIdleKind,
UploadInfoResp,
FileWorkerResp,
UploadStatus,
@ -13,14 +15,32 @@ import {
IChunkUploader,
} from "./interface";
const win: Window = self as any;
export class UploadWorker {
private uploader: IChunkUploader = new ChunkUploader();
private cycle: number = 100;
private working: boolean = false;
sendEvent = (resp: FileWorkerResp): void => {
// TODO: make this abstract
throw new Error("not implemented");
};
constructor() {}
constructor() {
win.setInterval(this.checkIdle, this.cycle);
}
checkIdle = () => {
if (this.working) {
return;
}
const resp: ImIdleResp = {
kind: imIdleKind,
};
this.sendEvent(resp);
};
setUploader = (uploader: IChunkUploader) => {
this.uploader = uploader;
@ -46,7 +66,8 @@ export class UploadWorker {
}
};
onMsg = (event: MessageEvent) => {
onMsg = async (event: MessageEvent) => {
this.working = true;
const req = event.data as FileWorkerReq;
switch (req.kind) {
@ -54,19 +75,25 @@ export class UploadWorker {
const syncReq = req as SyncReq;
if (syncReq.created) {
this.uploader
.upload(syncReq.filePath, syncReq.file, syncReq.uploaded)
.then(this.handleUploadStatus);
const status = await this.uploader.upload(
syncReq.filePath,
syncReq.file,
syncReq.uploaded
);
await this.handleUploadStatus(status);
} else {
this.uploader
.create(syncReq.filePath, syncReq.file)
.then(this.handleUploadStatus);
const status = await this.uploader.create(
syncReq.filePath,
syncReq.file
);
await this.handleUploadStatus(status);
}
break;
default:
console.error(`unknown worker request(${JSON.stringify(req)})`);
}
this.working = false;
};
onError = (ev: ErrorEvent) => {

View file

@ -9,6 +9,7 @@ import {
syncReqKind,
errKind,
uploadInfoKind,
imIdleKind,
UploadState,
} from "./interface";
import { errUploadMgr } from "../common/errors";
@ -35,49 +36,44 @@ export class UploadMgr {
constructor(worker: IWorker) {
this.worker = worker;
this.worker.onmessage = this.respHandler;
const syncing = () => {
if (this.infos.size === 0) {
return;
}
if (this.idx > 10000) {
this.idx = 0;
}
const start = this.idx % this.infos.size;
const infos = this.infos.valueSeq().toArray();
for (let i = 0; i < this.infos.size; i++) {
const pos = (start + i) % this.infos.size;
const info = infos[pos];
if (
info.state === UploadState.Ready ||
info.state === UploadState.Created
) {
this.infos = this.infos.set(info.filePath, {
...info,
state: UploadState.Uploading,
});
this.worker.postMessage({
kind: syncReqKind,
file: info.file,
filePath: info.filePath,
size: info.size,
uploaded: info.uploaded,
created: info.uploaded > 0 || info.state === UploadState.Created,
});
break;
}
}
this.idx++;
};
this.intervalID = win.setInterval(syncing, this.cycle);
}
destory = () => {
win.clearInterval(this.intervalID);
syncing = () => {
if (this.infos.size === 0) {
return;
}
if (this.idx > 10000) {
this.idx = 0;
}
const start = this.idx % this.infos.size;
const infos = this.infos.valueSeq().toArray();
for (let i = 0; i < this.infos.size; i++) {
const pos = (start + i) % this.infos.size;
const info = infos[pos];
if (
info.state === UploadState.Ready ||
info.state === UploadState.Created
) {
this.infos = this.infos.set(info.filePath, {
...info,
state: UploadState.Uploading,
});
this.worker.postMessage({
kind: syncReqKind,
file: info.file,
filePath: info.filePath,
size: info.size,
uploaded: info.uploaded,
created: info.uploaded > 0 || info.state === UploadState.Created,
});
break;
}
}
this.idx++;
};
_setInfos = (infos: OrderedMap<string, UploadEntry>) => {
@ -187,6 +183,9 @@ export class UploadMgr {
const resp = event.data as FileWorkerResp;
switch (resp.kind) {
case imIdleKind:
this.syncing();
break;
case errKind:
const errResp = resp as ErrResp;
const errEntry = this.infos.get(errResp.filePath);

View file

@ -18,9 +18,9 @@ import (
"github.com/ihexxa/gocfg"
"github.com/ihexxa/multipart"
"github.com/ihexxa/quickshare/src/db/userstore"
"github.com/ihexxa/quickshare/src/depidx"
q "github.com/ihexxa/quickshare/src/handlers"
"github.com/ihexxa/quickshare/src/db/userstore"
"github.com/ihexxa/quickshare/src/worker/localworker"
)
@ -793,10 +793,20 @@ func (h *FileHandlers) DelUploading(c *gin.Context) {
return
}
err = h.deps.FS().Remove(tmpFilePath)
_, err = h.deps.FS().Stat(tmpFilePath)
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
if os.IsNotExist(err) {
// no op
} else {
c.JSON(q.ErrResp(c, 500, err))
return
}
} else {
err = h.deps.FS().Remove(tmpFilePath)
if err != nil {
c.JSON(q.ErrResp(c, 500, err))
return
}
}
err = h.uploadMgr.DelInfo(userID, tmpFilePath)