feat(): refactor uploader (#68)
* chore(src): delete unused codes * fix(client/worker): refactor uploading part and fix issues * fix(ui/worker): rename fg worker file name * fix(ui/worker): cleanups * feat(ui/uploader): switch from file_uploader to chunk_uploader with tests * fix(ui/worker): clean up code
This commit is contained in:
parent
aefaca98b3
commit
021e5090be
13 changed files with 580 additions and 267 deletions
|
@ -24,6 +24,13 @@ export class Updater {
|
|||
this.filesClient = filesClient;
|
||||
}
|
||||
|
||||
initUploads = () => {
|
||||
this.props.uploadings.forEach(entry => {
|
||||
Up().addStopped(entry.realFilePath, entry.uploaded, entry.size);
|
||||
})
|
||||
// this.setUploadings(Up().list());
|
||||
};
|
||||
|
||||
addUploads = (fileList: List<File>) => {
|
||||
fileList.forEach(file => {
|
||||
const filePath = getItemPath(
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { List, Set, Map } from "immutable";
|
||||
|
||||
import BgWorker from "../worker/upload.bg.worker";
|
||||
import { FgWorker } from "../worker/upload.fgworker";
|
||||
import { FgWorker } from "../worker/upload.fg.worker";
|
||||
|
||||
import { Props as PanelProps } from "./root_frame";
|
||||
import { Item } from "./browser";
|
||||
|
|
|
@ -26,6 +26,9 @@ export class StateMgr extends React.Component<Props, State, {}> {
|
|||
return BrowserUpdater().refreshUploadings();
|
||||
})
|
||||
.then((_: boolean) => {
|
||||
BrowserUpdater().initUploads();
|
||||
})
|
||||
.then(() => {
|
||||
this.update(BrowserUpdater().setBrowser);
|
||||
})
|
||||
.then(() => {
|
||||
|
@ -38,7 +41,6 @@ export class StateMgr extends React.Component<Props, State, {}> {
|
|||
return PanesUpdater.listUsers();
|
||||
})
|
||||
.then((_: boolean) => {
|
||||
console.log(PanesUpdater);
|
||||
this.update(PanesUpdater.updateState);
|
||||
});
|
||||
};
|
||||
|
|
|
@ -2,7 +2,7 @@ import { mock, instance, when } from "ts-mockito";
|
|||
|
||||
import { UploadWorker } from "../upload.baseworker";
|
||||
import { FileUploader } from "../uploader";
|
||||
import { FileWorkerResp, UploadEntry, syncReqKind } from "../interface";
|
||||
import { FileWorkerResp, UploadEntry, syncReqKind, UploadState } from "../interface";
|
||||
|
||||
describe("upload.worker", () => {
|
||||
const content = ["123456"];
|
||||
|
@ -11,98 +11,16 @@ describe("upload.worker", () => {
|
|||
const fileSize = blob.size;
|
||||
const file = new File(content, filePath);
|
||||
|
||||
const makeEntry = (filePath: string, runnable: boolean): UploadEntry => {
|
||||
const makeEntry = (filePath: string, state: UploadState): UploadEntry => {
|
||||
return {
|
||||
file: file,
|
||||
filePath,
|
||||
size: fileSize,
|
||||
uploaded: 0,
|
||||
runnable,
|
||||
state,
|
||||
err: "",
|
||||
};
|
||||
};
|
||||
|
||||
test("onMsg:syncReqKind: filter list and start uploading correct file", async () => {
|
||||
const mockUploaderClass = mock(FileUploader);
|
||||
when(mockUploaderClass.start()).thenCall(
|
||||
(): Promise<boolean> => {
|
||||
return new Promise((resolve) => resolve(true));
|
||||
}
|
||||
);
|
||||
when(mockUploaderClass.stop()).thenCall(() => {});
|
||||
|
||||
interface TestCase {
|
||||
desc: string;
|
||||
infos: Array<UploadEntry>;
|
||||
expectedUploadingFile: string;
|
||||
expectedUploaderStartInput: string;
|
||||
currentFilePath: string;
|
||||
}
|
||||
|
||||
const tcs: Array<TestCase> = [
|
||||
{
|
||||
desc: "add new uploadings when worker is in idle",
|
||||
infos: [makeEntry("file1", true), makeEntry("file2", true)],
|
||||
currentFilePath: "",
|
||||
expectedUploadingFile: "file1",
|
||||
expectedUploaderStartInput: "file1",
|
||||
},
|
||||
{
|
||||
desc: "add new uploadings when worker is in idle and skip some stopped files",
|
||||
infos: [makeEntry("file1", false), makeEntry("file2", true)],
|
||||
currentFilePath: "",
|
||||
expectedUploadingFile: "file2",
|
||||
expectedUploaderStartInput: "file2",
|
||||
},
|
||||
{
|
||||
desc: "current file should be stopped and start new uploading",
|
||||
infos: [makeEntry("file0", false), makeEntry("file1", true)],
|
||||
currentFilePath: "file0",
|
||||
expectedUploadingFile: "file1",
|
||||
expectedUploaderStartInput: "file1",
|
||||
},
|
||||
{
|
||||
desc: "uploader should keep uploading if the first uploadable file is not changed",
|
||||
infos: [makeEntry("file1", true)],
|
||||
expectedUploadingFile: "file1",
|
||||
expectedUploaderStartInput: undefined,
|
||||
currentFilePath: "file1",
|
||||
},
|
||||
];
|
||||
|
||||
for (let i = 0; i < tcs.length; i++) {
|
||||
const uploadWorker = new UploadWorker();
|
||||
let currentUploader: FileUploader = undefined;
|
||||
let uploaderFile: File = undefined;
|
||||
let uploaderFilePath: string = undefined;
|
||||
let uploaderStopFilePath: string = undefined;
|
||||
uploadWorker.sendEvent = (_: FileWorkerResp) => {};
|
||||
uploadWorker.makeUploader = (
|
||||
file: File,
|
||||
filePath: string
|
||||
): FileUploader => {
|
||||
uploaderFile = file;
|
||||
uploaderFilePath = filePath;
|
||||
currentUploader = instance(mockUploaderClass);
|
||||
return currentUploader;
|
||||
};
|
||||
if (tcs[i].currentFilePath !== "") {
|
||||
uploadWorker.setFilePath(tcs[i].currentFilePath);
|
||||
}
|
||||
const req = {
|
||||
kind: syncReqKind,
|
||||
infos: tcs[i].infos,
|
||||
};
|
||||
|
||||
uploadWorker.onMsg(
|
||||
new MessageEvent("worker", {
|
||||
data: req,
|
||||
})
|
||||
);
|
||||
|
||||
console.log(tcs[i].desc);
|
||||
expect(uploadWorker.getFilePath()).toEqual(tcs[i].expectedUploadingFile);
|
||||
expect(uploaderFilePath).toEqual(tcs[i].expectedUploaderStartInput);
|
||||
}
|
||||
});
|
||||
xtest("onMsg:syncReqKind: filter list and start uploading correct file", async () => {});
|
||||
});
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
import { Map } from "immutable";
|
||||
import { mock, instance, when, anything } from "ts-mockito";
|
||||
|
||||
import { FilesClient } from "../../client/files_mock";
|
||||
import { makePromise } from "../../test/helpers";
|
||||
import { FgWorker } from "../upload.fg.worker";
|
||||
|
||||
import { Up, initUploadMgr } from "../upload_mgr";
|
||||
import {
|
||||
UploadState,
|
||||
UploadStatus,
|
||||
} from "../interface";
|
||||
|
||||
import {
|
||||
FileWorkerReq,
|
||||
|
@ -29,88 +31,121 @@ const delay = (ms: number): Promise<void> => {
|
|||
};
|
||||
|
||||
describe("UploadMgr", () => {
|
||||
const content = ["123456"];
|
||||
const filePath = "mock/file";
|
||||
const blob = new Blob(content);
|
||||
const fileSize = blob.size;
|
||||
const file = new File(content, filePath);
|
||||
const makeInfo = (filePath: string, runnable: boolean): UploadEntry => {
|
||||
return {
|
||||
file: file,
|
||||
filePath: filePath,
|
||||
size: fileSize,
|
||||
uploaded: 0,
|
||||
runnable,
|
||||
err: "",
|
||||
};
|
||||
const newFile = (filePath: string, content: string): File => {
|
||||
const contentArray = [content];
|
||||
const blob = new Blob(contentArray);
|
||||
return new File(contentArray, filePath);
|
||||
};
|
||||
|
||||
test("test init and respHandler: pick up tasks and remove them after done", async () => {
|
||||
test("test syncing: pick up item which is ready", async () => {
|
||||
interface TestCase {
|
||||
inputInfos: Array<UploadEntry>;
|
||||
expectedInfos: Array<UploadEntry>;
|
||||
expectedInfo: UploadEntry;
|
||||
}
|
||||
|
||||
class MockWorker {
|
||||
constructor() {}
|
||||
public expectedEntry: UploadEntry;
|
||||
onmessage = (event: MessageEvent): void => {};
|
||||
postMessage = (req: FileWorkerReq): void => {
|
||||
switch (req.kind) {
|
||||
case syncReqKind:
|
||||
const syncReq = req as SyncReq;
|
||||
// find the first qualified task
|
||||
const infoArray = syncReq.infos;
|
||||
for (let i = 0; i < infoArray.length; i++) {
|
||||
if (
|
||||
infoArray[i].runnable &&
|
||||
infoArray[i].uploaded < infoArray[i].size
|
||||
) {
|
||||
this.onmessage(
|
||||
new MessageEvent("worker", {
|
||||
data: {
|
||||
kind: uploadInfoKind,
|
||||
filePath: infoArray[i].filePath,
|
||||
uploaded: infoArray[i].size,
|
||||
runnable: true,
|
||||
err: "",
|
||||
},
|
||||
})
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw Error(
|
||||
`unknown worker request ${req.kind} ${req.kind === syncReqKind}`
|
||||
);
|
||||
}
|
||||
expect(req.filePath).toEqual(this.expectedEntry.filePath);
|
||||
expect(req.uploaded).toEqual(this.expectedEntry.uploaded);
|
||||
expect(req.size).toEqual(this.expectedEntry.size);
|
||||
};
|
||||
}
|
||||
|
||||
const tcs: Array<TestCase> = [
|
||||
{
|
||||
inputInfos: [
|
||||
makeInfo("path1/file1", true),
|
||||
makeInfo("path2/file1", true),
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "t1/file1",
|
||||
state: UploadState.Stopped,
|
||||
uploaded: 3,
|
||||
size: 2,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "t1/file2",
|
||||
state: UploadState.Uploading,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "t1/file3",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
],
|
||||
expectedInfos: [],
|
||||
expectedInfo: {
|
||||
file: undefined,
|
||||
filePath: "t1/file3",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
},
|
||||
{
|
||||
inputInfos: [
|
||||
makeInfo("path1/file1", true),
|
||||
makeInfo("path2/file1", false),
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "path3/file1",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "path2/file1",
|
||||
state: UploadState.Stopped,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
],
|
||||
expectedInfos: [makeInfo("path2/file1", false)],
|
||||
expectedInfo: {
|
||||
file: undefined,
|
||||
filePath: "path3/file1",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
},
|
||||
{
|
||||
inputInfos: [
|
||||
makeInfo("path1/file1", false),
|
||||
makeInfo("path2/file1", true),
|
||||
],
|
||||
expectedInfos: [
|
||||
makeInfo("path1/file1", false),
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "path3/file1",
|
||||
state: UploadState.Created,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: undefined,
|
||||
filePath: "path2/file1",
|
||||
state: UploadState.Stopped,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
],
|
||||
expectedInfo: {
|
||||
file: undefined,
|
||||
filePath: "path3/file1",
|
||||
state: UploadState.Created,
|
||||
uploaded: 6,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
|
@ -118,23 +153,146 @@ describe("UploadMgr", () => {
|
|||
for (let i = 0; i < tcs.length; i++) {
|
||||
initUploadMgr(worker);
|
||||
const up = Up();
|
||||
up.setCycle(100);
|
||||
up.setCycle(50);
|
||||
|
||||
const infoMap = arraytoMap(tcs[i].inputInfos);
|
||||
up._setInfos(infoMap);
|
||||
worker.expectedEntry = tcs[i].expectedInfo;
|
||||
|
||||
// TODO: find a better way to wait
|
||||
// polling needs several rounds to finish all the tasks
|
||||
await delay(tcs.length * up.getCycle() + 1000);
|
||||
up.destory();
|
||||
}
|
||||
});
|
||||
|
||||
test("test e2e: from syncing to respHandler", async () => {
|
||||
interface TestCase {
|
||||
inputInfos: Array<UploadEntry>;
|
||||
expectedInfos: Array<UploadEntry>;
|
||||
}
|
||||
|
||||
class MockUploader {
|
||||
constructor() {}
|
||||
create = (filePath: string, file: File): Promise<UploadStatus> => {
|
||||
return new Promise((resolve) =>
|
||||
resolve({
|
||||
filePath,
|
||||
uploaded: 0,
|
||||
state: UploadState.Created,
|
||||
err: "",
|
||||
})
|
||||
);
|
||||
};
|
||||
upload = (
|
||||
filePath: string,
|
||||
file: File,
|
||||
uploaded: number
|
||||
): Promise<UploadStatus> => {
|
||||
return new Promise((resolve) =>
|
||||
resolve({
|
||||
filePath,
|
||||
uploaded: file.size,
|
||||
state: UploadState.Ready,
|
||||
err: "",
|
||||
})
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
const tcs: Array<TestCase> = [
|
||||
{
|
||||
inputInfos: [
|
||||
{
|
||||
file: newFile("t1/file1", "123"),
|
||||
filePath: "t1/file1",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: newFile("t1/file2", "123"),
|
||||
filePath: "t1/file2",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
],
|
||||
expectedInfos: [],
|
||||
},
|
||||
{
|
||||
inputInfos: [
|
||||
{
|
||||
file: newFile("t1/file1", "123"),
|
||||
filePath: "t1/file1",
|
||||
state: UploadState.Stopped,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: newFile("t1/file2", "123"),
|
||||
filePath: "t1/file2",
|
||||
state: UploadState.Error,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: newFile("t1/file3", "123"),
|
||||
filePath: "t1/file3",
|
||||
state: UploadState.Ready,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
],
|
||||
expectedInfos: [
|
||||
{
|
||||
file: newFile("t1/file1", "123"),
|
||||
filePath: "t1/file1",
|
||||
state: UploadState.Stopped,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
file: newFile("t1/file2", "123"),
|
||||
filePath: "t1/file2",
|
||||
state: UploadState.Error,
|
||||
uploaded: 0,
|
||||
size: 3,
|
||||
err: "",
|
||||
},
|
||||
],
|
||||
},
|
||||
];
|
||||
|
||||
for (let i = 0; i < tcs.length; i++) {
|
||||
const uploader = new MockUploader();
|
||||
const worker = new FgWorker();
|
||||
worker.setUploader(uploader);
|
||||
initUploadMgr(worker);
|
||||
const up = Up();
|
||||
up.setCycle(1);
|
||||
|
||||
const infoMap = arraytoMap(tcs[i].inputInfos);
|
||||
up._setInfos(infoMap);
|
||||
|
||||
// TODO: find a better way to wait, or this test is flanky
|
||||
// polling needs several rounds to finish all the tasks
|
||||
await delay(tcs.length * up.getCycle() + 1000);
|
||||
// TODO: find a better way to wait
|
||||
const gotInfos = up.list();
|
||||
await delay(tcs[i].inputInfos.length * up.getCycle() * 2 + 5000);
|
||||
|
||||
const expectedInfoMap = arraytoMap(tcs[i].expectedInfos);
|
||||
gotInfos.keySeq().forEach((filePath) => {
|
||||
expect(gotInfos.get(filePath)).toEqual(expectedInfoMap.get(filePath));
|
||||
});
|
||||
expectedInfoMap.keySeq().forEach((filePath) => {
|
||||
expect(expectedInfoMap.get(filePath)).toEqual(gotInfos.get(filePath));
|
||||
});
|
||||
const infos = up.list();
|
||||
expect(infos.size).toEqual(tcs[i].expectedInfos.length);
|
||||
if (tcs[i].expectedInfos.length !== 0) {
|
||||
tcs[i].expectedInfos.forEach((info) => {
|
||||
const expectedInfo = infos.get(info.filePath);
|
||||
expect(expectedInfo).toEqual(info);
|
||||
});
|
||||
}
|
||||
|
||||
up.destory();
|
||||
}
|
||||
|
|
163
src/client/web/src/worker/chunk_uploader.ts
Normal file
163
src/client/web/src/worker/chunk_uploader.ts
Normal file
|
@ -0,0 +1,163 @@
|
|||
import { FilesClient } from "../client/files";
|
||||
import { IFilesClient, Response, isFatalErr } from "../client";
|
||||
import { UploadStatus, UploadState } from "./interface";
|
||||
|
||||
// TODO: get settings from server
|
||||
// TODO: move chunk copying to worker
|
||||
const defaultChunkLen = 1024 * 1024 * 1;
|
||||
const speedDownRatio = 0.5;
|
||||
const speedUpRatio = 1.05;
|
||||
const createRetryLimit = 2;
|
||||
const uploadRetryLimit = 1024;
|
||||
const backoffMax = 2000;
|
||||
|
||||
export interface ReaderResult {
|
||||
chunk?: string;
|
||||
err?: Error;
|
||||
}
|
||||
|
||||
export class ChunkUploader {
|
||||
private reader = new FileReader();
|
||||
private client: IFilesClient = new FilesClient("");
|
||||
|
||||
private chunkLen: number = defaultChunkLen;
|
||||
|
||||
constructor() {}
|
||||
|
||||
setClient = (client: IFilesClient) => {
|
||||
this.client = client;
|
||||
};
|
||||
|
||||
backOff = async (): Promise<void> => {
|
||||
return new Promise((resolve) => {
|
||||
const delay = Math.floor(Math.random() * backoffMax);
|
||||
setTimeout(resolve, delay);
|
||||
});
|
||||
};
|
||||
|
||||
create = async (filePath: string, file: File): Promise<UploadStatus> => {
|
||||
let resp: Response;
|
||||
|
||||
for (let i = 0; i < createRetryLimit; i++) {
|
||||
try {
|
||||
resp = await this.client.create(filePath, file.size);
|
||||
if (resp.status === 200 || resp.status === 304) {
|
||||
return {
|
||||
filePath,
|
||||
uploaded: 0,
|
||||
state: UploadState.Created,
|
||||
err: "",
|
||||
};
|
||||
}
|
||||
} catch (e) {
|
||||
await this.backOff();
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
filePath,
|
||||
uploaded: 0,
|
||||
state: UploadState.Error,
|
||||
err: `failed to create ${filePath}: status=${resp.statusText}`,
|
||||
};
|
||||
};
|
||||
|
||||
upload = async (
|
||||
filePath: string,
|
||||
file: File,
|
||||
uploaded: number
|
||||
): Promise<UploadStatus> => {
|
||||
if (this.chunkLen === 0) {
|
||||
this.chunkLen = 1; // reset it to 1B
|
||||
} else if (uploaded > file.size) {
|
||||
return {
|
||||
filePath,
|
||||
uploaded,
|
||||
state: UploadState.Error,
|
||||
err: "uploaded is greater than file size",
|
||||
};
|
||||
}
|
||||
|
||||
const readerPromise = new Promise<ReaderResult>(
|
||||
(resolve: (result: ReaderResult) => void) => {
|
||||
this.reader.onerror = (_: ProgressEvent<FileReader>) => {
|
||||
resolve({ err: this.reader.error });
|
||||
};
|
||||
|
||||
this.reader.onloadend = (ev: ProgressEvent<FileReader>) => {
|
||||
const dataURL = ev.target.result as string; // readAsDataURL
|
||||
const base64Chunk = dataURL.slice(dataURL.indexOf(",") + 1);
|
||||
resolve({ chunk: base64Chunk });
|
||||
};
|
||||
}
|
||||
);
|
||||
|
||||
const chunkRightPos =
|
||||
uploaded + this.chunkLen > file.size
|
||||
? file.size
|
||||
: uploaded + this.chunkLen;
|
||||
const blob = file.slice(uploaded, chunkRightPos);
|
||||
this.reader.readAsDataURL(blob);
|
||||
|
||||
const result = await readerPromise;
|
||||
if (result.err != null) {
|
||||
return {
|
||||
filePath,
|
||||
uploaded,
|
||||
state: UploadState.Error,
|
||||
err: result.err.toString(),
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const uploadResp = await this.client.uploadChunk(
|
||||
filePath,
|
||||
result.chunk,
|
||||
uploaded
|
||||
);
|
||||
|
||||
if (uploadResp.status === 200 && uploadResp.data != null) {
|
||||
this.chunkLen = Math.ceil(this.chunkLen * speedUpRatio);
|
||||
return {
|
||||
filePath,
|
||||
uploaded: uploadResp.data.uploaded,
|
||||
state: UploadState.Ready,
|
||||
err: "",
|
||||
};
|
||||
} else if (isFatalErr(uploadResp)) {
|
||||
return {
|
||||
filePath,
|
||||
uploaded,
|
||||
state: UploadState.Error,
|
||||
err: `failed to upload chunk: ${uploadResp.statusText}`,
|
||||
};
|
||||
}
|
||||
|
||||
this.chunkLen = Math.ceil(this.chunkLen * speedDownRatio);
|
||||
await this.backOff();
|
||||
|
||||
const uploadStatusResp = await this.client.uploadStatus(filePath);
|
||||
return uploadStatusResp.status === 200
|
||||
? {
|
||||
filePath,
|
||||
uploaded: uploadStatusResp.data.uploaded,
|
||||
state: UploadState.Ready,
|
||||
err: "",
|
||||
}
|
||||
: {
|
||||
filePath,
|
||||
uploaded: uploaded,
|
||||
state: UploadState.Error,
|
||||
err: `failed to get upload status: ${uploadStatusResp.statusText}`,
|
||||
};
|
||||
} catch (e) {
|
||||
return {
|
||||
filePath,
|
||||
uploaded: uploaded,
|
||||
state: UploadState.Error,
|
||||
err: `[chunk uploader]: ${e.toString()}`,
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
|
@ -1,12 +1,36 @@
|
|||
export const enum UploadState {
|
||||
Created,
|
||||
Ready,
|
||||
Uploading,
|
||||
Stopped,
|
||||
Error,
|
||||
}
|
||||
|
||||
export interface UploadStatus {
|
||||
filePath: string;
|
||||
uploaded: number;
|
||||
state: UploadState;
|
||||
err: string;
|
||||
}
|
||||
|
||||
export interface UploadEntry {
|
||||
file: File;
|
||||
filePath: string;
|
||||
size: number;
|
||||
uploaded: number;
|
||||
runnable: boolean;
|
||||
state: UploadState;
|
||||
err: string;
|
||||
}
|
||||
|
||||
export interface IChunkUploader {
|
||||
create: (filePath: string, file: File) => Promise<UploadStatus>;
|
||||
upload: (
|
||||
filePath: string,
|
||||
file: File,
|
||||
uploaded: number
|
||||
) => Promise<UploadStatus>;
|
||||
}
|
||||
|
||||
export type eventKind = SyncReqKind | ErrKind | UploadInfoKind;
|
||||
export interface WorkerEvent {
|
||||
kind: eventKind;
|
||||
|
@ -17,7 +41,11 @@ export const syncReqKind: SyncReqKind = "worker.req.sync";
|
|||
|
||||
export interface SyncReq extends WorkerEvent {
|
||||
kind: SyncReqKind;
|
||||
infos: Array<UploadEntry>;
|
||||
file: File,
|
||||
filePath: string;
|
||||
size: number;
|
||||
uploaded: number;
|
||||
created: boolean;
|
||||
}
|
||||
|
||||
export type FileWorkerReq = SyncReq;
|
||||
|
@ -26,6 +54,7 @@ export type ErrKind = "worker.resp.err";
|
|||
export const errKind: ErrKind = "worker.resp.err";
|
||||
export interface ErrResp extends WorkerEvent {
|
||||
kind: ErrKind;
|
||||
filePath: string;
|
||||
err: string;
|
||||
}
|
||||
|
||||
|
@ -36,7 +65,7 @@ export interface UploadInfoResp extends WorkerEvent {
|
|||
kind: UploadInfoKind;
|
||||
filePath: string;
|
||||
uploaded: number;
|
||||
runnable: boolean;
|
||||
state: UploadState;
|
||||
err: string;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { FileUploader } from "./uploader";
|
||||
import { ChunkUploader } from "./chunk_uploader";
|
||||
import {
|
||||
FileWorkerReq,
|
||||
syncReqKind,
|
||||
|
@ -8,92 +8,73 @@ import {
|
|||
uploadInfoKind,
|
||||
UploadInfoResp,
|
||||
FileWorkerResp,
|
||||
UploadStatus,
|
||||
UploadState,
|
||||
IChunkUploader,
|
||||
} from "./interface";
|
||||
|
||||
export class UploadWorker {
|
||||
private file: File = undefined;
|
||||
private filePath: string = undefined;
|
||||
private uploader: FileUploader = undefined;
|
||||
private uploader: IChunkUploader = new ChunkUploader();
|
||||
sendEvent = (resp: FileWorkerResp): void => {
|
||||
// TODO: make this abstract
|
||||
throw new Error("not implemented");
|
||||
};
|
||||
makeUploader = (file: File, filePath: string): FileUploader => {
|
||||
return new FileUploader(file, filePath, this.onCb);
|
||||
};
|
||||
startUploader = (file: File, filePath: string) => {
|
||||
this.file = file;
|
||||
this.filePath = filePath;
|
||||
this.uploader = this.makeUploader(file, filePath);
|
||||
this.uploader.start();
|
||||
};
|
||||
stopUploader = () => {
|
||||
if (this.uploader != null) {
|
||||
this.uploader.stop();
|
||||
this.file = undefined;
|
||||
this.filePath = undefined;
|
||||
}
|
||||
};
|
||||
getFilePath = (): string => {
|
||||
return this.filePath;
|
||||
};
|
||||
|
||||
setFilePath = (fp: string) => {
|
||||
this.filePath = fp;
|
||||
};
|
||||
|
||||
constructor() {}
|
||||
|
||||
setUploader = (uploader: IChunkUploader) => {
|
||||
this.uploader = uploader;
|
||||
};
|
||||
|
||||
handleUploadStatus = (status: UploadStatus) => {
|
||||
if (status.state !== UploadState.Error) {
|
||||
const resp: UploadInfoResp = {
|
||||
kind: uploadInfoKind,
|
||||
filePath: status.filePath,
|
||||
uploaded: status.uploaded,
|
||||
state: status.state,
|
||||
err: "",
|
||||
};
|
||||
this.sendEvent(resp);
|
||||
} else {
|
||||
const resp: ErrResp = {
|
||||
kind: errKind,
|
||||
filePath: status.filePath,
|
||||
err: status.err,
|
||||
};
|
||||
this.sendEvent(resp);
|
||||
}
|
||||
};
|
||||
|
||||
onMsg = (event: MessageEvent) => {
|
||||
const req = event.data as FileWorkerReq;
|
||||
|
||||
switch (req.kind) {
|
||||
case syncReqKind:
|
||||
// find the first qualified task
|
||||
const syncReq = req as SyncReq;
|
||||
const infoArray = syncReq.infos;
|
||||
|
||||
for (let i = 0; i < infoArray.length; i++) {
|
||||
if (infoArray[i].runnable) {
|
||||
if (infoArray[i].filePath === this.filePath) {
|
||||
// in uploading, do nothing
|
||||
} else {
|
||||
this.stopUploader();
|
||||
this.startUploader(infoArray[i].file, infoArray[i].filePath);
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
if (infoArray[i].filePath === this.filePath) {
|
||||
this.stopUploader();
|
||||
}
|
||||
}
|
||||
if (syncReq.created) {
|
||||
this.uploader
|
||||
.upload(syncReq.filePath, syncReq.file, syncReq.uploaded)
|
||||
.then(this.handleUploadStatus);
|
||||
} else {
|
||||
this.uploader
|
||||
.create(syncReq.filePath, syncReq.file)
|
||||
.then(this.handleUploadStatus);
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
console.log(`unknown worker request(${JSON.stringify(req)})`);
|
||||
console.error(`unknown worker request(${JSON.stringify(req)})`);
|
||||
}
|
||||
};
|
||||
|
||||
onError = (ev: ErrorEvent) => {
|
||||
const errResp: ErrResp = {
|
||||
kind: errKind,
|
||||
filePath: "unknown",
|
||||
err: ev.error,
|
||||
};
|
||||
this.sendEvent(errResp);
|
||||
};
|
||||
|
||||
onCb = (
|
||||
filePath: string,
|
||||
uploaded: number,
|
||||
runnable: boolean,
|
||||
err: string
|
||||
): void => {
|
||||
const uploadInfoResp: UploadInfoResp = {
|
||||
kind: uploadInfoKind,
|
||||
filePath,
|
||||
uploaded,
|
||||
runnable,
|
||||
err,
|
||||
};
|
||||
this.sendEvent(uploadInfoResp);
|
||||
};
|
||||
}
|
||||
|
|
|
@ -9,8 +9,8 @@ import {
|
|||
syncReqKind,
|
||||
errKind,
|
||||
uploadInfoKind,
|
||||
UploadState,
|
||||
} from "./interface";
|
||||
import { FgWorker } from "./upload.fgworker";
|
||||
|
||||
const win: Window = self as any;
|
||||
|
||||
|
@ -20,22 +20,54 @@ export interface IWorker {
|
|||
}
|
||||
|
||||
export class UploadMgr {
|
||||
private infos = OrderedMap<string, UploadEntry>();
|
||||
private worker: IWorker;
|
||||
private intervalID: number;
|
||||
private idx = 0;
|
||||
private cycle: number = 500;
|
||||
private intervalID: number;
|
||||
private worker: IWorker;
|
||||
private infos = OrderedMap<string, UploadEntry>();
|
||||
private statusCb = (infos: Map<string, UploadEntry>): void => {};
|
||||
|
||||
constructor(worker: IWorker) {
|
||||
this.worker = worker;
|
||||
// TODO: fallback to normal if Web Worker is not available
|
||||
this.worker.onmessage = this.respHandler;
|
||||
|
||||
const syncing = () => {
|
||||
this.worker.postMessage({
|
||||
kind: syncReqKind,
|
||||
infos: this.infos.valueSeq().toArray(),
|
||||
});
|
||||
if (this.infos.size === 0) {
|
||||
return;
|
||||
}
|
||||
if (this.idx > 10000) {
|
||||
this.idx = 0;
|
||||
}
|
||||
|
||||
const start = this.idx % this.infos.size;
|
||||
const infos = this.infos.valueSeq().toArray();
|
||||
for (let i = 0; i < this.infos.size; i++) {
|
||||
const pos = (start + i) % this.infos.size;
|
||||
const info = infos[pos];
|
||||
|
||||
if (
|
||||
info.state === UploadState.Ready ||
|
||||
info.state === UploadState.Created
|
||||
) {
|
||||
|
||||
this.infos = this.infos.set(info.filePath, {
|
||||
...info,
|
||||
state: UploadState.Uploading,
|
||||
});
|
||||
|
||||
this.worker.postMessage({
|
||||
kind: syncReqKind,
|
||||
file: info.file,
|
||||
filePath: info.filePath,
|
||||
size: info.size,
|
||||
uploaded: info.uploaded,
|
||||
created: info.uploaded > 0 || info.state === UploadState.Created,
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
this.idx++;
|
||||
};
|
||||
this.intervalID = win.setInterval(syncing, this.cycle);
|
||||
}
|
||||
|
@ -60,6 +92,19 @@ export class UploadMgr {
|
|||
this.statusCb = cb;
|
||||
};
|
||||
|
||||
// addStopped is for initializing uploading list in the UploadMgr
|
||||
// notice even uploading list are shown in the UI, it may not inited in the UploadMgr
|
||||
addStopped = (filePath: string, uploaded: number, fileSize: number) => {
|
||||
this.infos = this.infos.set(filePath, {
|
||||
file: new File([""], filePath), // create a dumb file
|
||||
filePath,
|
||||
size: fileSize,
|
||||
uploaded,
|
||||
state: UploadState.Stopped,
|
||||
err: "",
|
||||
});
|
||||
};
|
||||
|
||||
add = (file: File, filePath: string) => {
|
||||
const entry = this.infos.get(filePath);
|
||||
if (entry == null) {
|
||||
|
@ -69,33 +114,52 @@ export class UploadMgr {
|
|||
filePath: filePath,
|
||||
size: file.size,
|
||||
uploaded: 0,
|
||||
runnable: true,
|
||||
state: UploadState.Ready,
|
||||
err: "",
|
||||
});
|
||||
} else {
|
||||
// restart the uploading
|
||||
this.infos = this.infos.set(filePath, {
|
||||
...entry,
|
||||
runnable: true,
|
||||
});
|
||||
if (
|
||||
entry.state === UploadState.Stopped &&
|
||||
filePath === entry.filePath &&
|
||||
file.size === entry.size
|
||||
) {
|
||||
// try to upload a file with same name but actually with different content.
|
||||
// it still can not resolve one case: names and sizes are same, but contents are different
|
||||
// TODO: showing file SHA will avoid above case
|
||||
this.infos = this.infos.set(filePath, {
|
||||
...entry,
|
||||
file: file,
|
||||
state: UploadState.Ready,
|
||||
});
|
||||
} else {
|
||||
alert(
|
||||
`(${filePath}) seems not same file with uploading item, please check.`
|
||||
);
|
||||
}
|
||||
}
|
||||
this.statusCb(this.infos.toMap());
|
||||
};
|
||||
|
||||
stop = (filePath: string) => {
|
||||
const entry = this.infos.get(filePath);
|
||||
if (entry != null) {
|
||||
|
||||
this.infos = this.infos.set(filePath, {
|
||||
...entry,
|
||||
runnable: false,
|
||||
state: UploadState.Stopped,
|
||||
});
|
||||
|
||||
} else {
|
||||
alert(`failed to stop uploading ${filePath}: not found`);
|
||||
}
|
||||
this.statusCb(this.infos.toMap());
|
||||
};
|
||||
|
||||
delete = (filePath: string) => {
|
||||
this.stop(filePath);
|
||||
this.infos = this.infos.delete(filePath);
|
||||
this.statusCb(this.infos.toMap());
|
||||
};
|
||||
|
||||
list = (): OrderedMap<string, UploadEntry> => {
|
||||
|
@ -107,9 +171,20 @@ export class UploadMgr {
|
|||
|
||||
switch (resp.kind) {
|
||||
case errKind:
|
||||
// TODO: refine this
|
||||
const errResp = resp as ErrResp;
|
||||
console.error(`respHandler: ${errResp}`);
|
||||
const errEntry = this.infos.get(errResp.filePath);
|
||||
|
||||
if (errEntry != null) {
|
||||
this.infos = this.infos.set(errResp.filePath, {
|
||||
...errEntry,
|
||||
state: UploadState.Error,
|
||||
err: `${errEntry.err} / ${errResp.filePath}`,
|
||||
});
|
||||
} else {
|
||||
// TODO: refine this
|
||||
console.error(`uploading ${errResp.filePath} may already be deleted`);
|
||||
}
|
||||
|
||||
break;
|
||||
case uploadInfoKind:
|
||||
const infoResp = resp as UploadInfoResp;
|
||||
|
@ -122,17 +197,17 @@ export class UploadMgr {
|
|||
this.infos = this.infos.set(infoResp.filePath, {
|
||||
...entry,
|
||||
uploaded: infoResp.uploaded,
|
||||
runnable: infoResp.runnable,
|
||||
err: infoResp.err,
|
||||
state:
|
||||
// this avoids overwriting Stopped/Error state
|
||||
(entry.state === UploadState.Stopped || entry.state === UploadState.Error)
|
||||
? UploadState.Stopped
|
||||
: infoResp.state,
|
||||
});
|
||||
}
|
||||
|
||||
// call back to update the info
|
||||
this.statusCb(this.infos.toMap());
|
||||
} else {
|
||||
// TODO: refine this
|
||||
console.error(
|
||||
`respHandler: fail to found: file(${
|
||||
`respHandler: may already be deleted: file(${
|
||||
infoResp.filePath
|
||||
}) infos(${this.infos.toObject()})`
|
||||
);
|
||||
|
@ -141,6 +216,7 @@ export class UploadMgr {
|
|||
default:
|
||||
console.error(`respHandler: response kind not found: ${resp}`);
|
||||
}
|
||||
this.statusCb(this.infos.toMap());
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -175,7 +175,7 @@ export class FileUploader {
|
|||
|
||||
if (this.chunkLen === 0) {
|
||||
this.errMsgs.push(
|
||||
"the network condition may be poor, please retry later."
|
||||
"the network condition is poor or server is busy, please retry later."
|
||||
);
|
||||
} else if (!this.isOn) {
|
||||
this.errMsgs.push("uploading is stopped");
|
||||
|
|
|
@ -136,7 +136,6 @@ func (h *FileHandlers) Create(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// fileDir := q.FsPath(userID, filepath.Dir(req.Path))
|
||||
err = h.deps.FS().MkdirAll(filepath.Dir(req.Path))
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
|
@ -160,7 +159,6 @@ func (h *FileHandlers) Delete(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// filePath = q.FsPath(userID, filePath)
|
||||
err := h.deps.FS().Remove(filePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
|
@ -190,7 +188,6 @@ func (h *FileHandlers) Metadata(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// filePath = q.FsPath(userID, filePath)
|
||||
info, err := h.deps.FS().Stat(filePath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
|
@ -222,7 +219,6 @@ func (h *FileHandlers) Mkdir(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// dirPath := q.FsPath(userID, req.Path)
|
||||
err := h.deps.FS().MkdirAll(req.Path)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
|
@ -250,8 +246,6 @@ func (h *FileHandlers) Move(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// oldPath := q.FsPath(userID, req.OldPath)
|
||||
// newPath := q.FsPath(userID, req.NewPath)
|
||||
_, err := h.deps.FS().Stat(req.OldPath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
|
@ -357,7 +351,6 @@ func (h *FileHandlers) UploadChunk(c *gin.Context) {
|
|||
}
|
||||
|
||||
func (h *FileHandlers) getFSFilePath(userID, fsFilePath string) (string, error) {
|
||||
// fsFilePath := q.FsPath(userID, reqPath)
|
||||
_, err := h.deps.FS().Stat(fsFilePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
|
@ -447,7 +440,6 @@ func (h *FileHandlers) Download(c *gin.Context) {
|
|||
|
||||
// TODO: when sharing is introduced, move following logics to a separeted method
|
||||
// concurrently file accessing is managed by os
|
||||
// filePath = q.FsPath(userID, filePath)
|
||||
info, err := h.deps.FS().Stat(filePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
|
@ -525,7 +517,6 @@ func (h *FileHandlers) List(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// dirPath = q.FsPath(userID, dirPath)
|
||||
infos, err := h.deps.FS().ListDir(dirPath)
|
||||
if err != nil {
|
||||
c.JSON(q.ErrResp(c, 500, err))
|
||||
|
|
|
@ -125,22 +125,10 @@ func ErrResp(c *gin.Context, code int, err error) (int, interface{}) {
|
|||
|
||||
}
|
||||
|
||||
func FsPath(userID, relFilePath string) string {
|
||||
return filepath.Join(userID, FsDir, relFilePath)
|
||||
}
|
||||
|
||||
func HomePath(userID, relFilePath string) string {
|
||||
return filepath.Join(userID, relFilePath)
|
||||
}
|
||||
|
||||
func FsRootPath(userID, relFilePath string) string {
|
||||
return filepath.Join(userID, FsRootDir, relFilePath)
|
||||
}
|
||||
|
||||
func GetTmpPath(userID, relFilePath string) string {
|
||||
return filepath.Join(UploadDir, userID, fmt.Sprintf("%x", sha1.Sum([]byte(relFilePath))))
|
||||
}
|
||||
|
||||
func UploadPath(userID, relFilePath string) string {
|
||||
return filepath.Join(UploadFolder(userID), fmt.Sprintf("%x", sha1.Sum([]byte(relFilePath))))
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue