Queue uploads
This commit is contained in:
		@@ -24,18 +24,40 @@ type Dictionary<KeyType extends string | number | symbol, ValueType> = {
 | 
				
			|||||||
class StreamUploadManager<TCacheShape> {
 | 
					class StreamUploadManager<TCacheShape> {
 | 
				
			||||||
	client: ApolloClient<TCacheShape>;
 | 
						client: ApolloClient<TCacheShape>;
 | 
				
			||||||
	videoId: number;
 | 
						videoId: number;
 | 
				
			||||||
	nextUploadIdToRequest: number;
 | 
						nextUploadIdToRequest: number = 0;
 | 
				
			||||||
	highestUploadLinkObtained: number;
 | 
						highestUploadLinkObtained: number = -1;
 | 
				
			||||||
	prefetchedUploadLinks: Dictionary<number, string>;
 | 
						prefetchedUploadLinks: Dictionary<number, string> = {};
 | 
				
			||||||
 | 
						uploadQueue: Array<() => Promise<void>> = [];
 | 
				
			||||||
 | 
						isUploading: boolean = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	constructor(client: ApolloClient<TCacheShape>, streamId: number) {
 | 
						constructor(client: ApolloClient<TCacheShape>, videoId: number) {
 | 
				
			||||||
		this.client = client;
 | 
							this.client = client;
 | 
				
			||||||
		this.videoId = streamId;
 | 
							this.videoId = videoId;
 | 
				
			||||||
		this.highestUploadLinkObtained = -1;
 | 
						}
 | 
				
			||||||
		this.prefetchedUploadLinks = {};
 | 
					
 | 
				
			||||||
 | 
						enqueueUploadTask(task: () => Promise<void>) {
 | 
				
			||||||
 | 
							this.uploadQueue.push(task);
 | 
				
			||||||
 | 
							this.processUploadQueue();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						async processUploadQueue() {
 | 
				
			||||||
 | 
							if (this.isUploading || this.uploadQueue.length === 0) {
 | 
				
			||||||
 | 
								return;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							this.isUploading = true;
 | 
				
			||||||
 | 
							const task = this.uploadQueue.shift();
 | 
				
			||||||
 | 
							try {
 | 
				
			||||||
 | 
								if (task) await task();
 | 
				
			||||||
 | 
							} catch (error) {
 | 
				
			||||||
 | 
								console.error("Error processing upload task", error);
 | 
				
			||||||
 | 
							} finally {
 | 
				
			||||||
 | 
								this.isUploading = false;
 | 
				
			||||||
 | 
								this.processUploadQueue();
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	async uploadChunk({ filepath, index }: { filepath: string; index: number }) {
 | 
						async uploadChunk({ filepath, index }: { filepath: string; index: number }) {
 | 
				
			||||||
 | 
							this.enqueueUploadTask(async () => {
 | 
				
			||||||
			const uploadUrl = await this.getUploadLink(index);
 | 
								const uploadUrl = await this.getUploadLink(index);
 | 
				
			||||||
			const uploadRequest = RNFS.uploadFiles({
 | 
								const uploadRequest = RNFS.uploadFiles({
 | 
				
			||||||
				toUrl: uploadUrl,
 | 
									toUrl: uploadUrl,
 | 
				
			||||||
@@ -52,27 +74,29 @@ class StreamUploadManager<TCacheShape> {
 | 
				
			|||||||
				},
 | 
									},
 | 
				
			||||||
			});
 | 
								});
 | 
				
			||||||
			console.log(JSON.stringify(uploadRequest));
 | 
								console.log(JSON.stringify(uploadRequest));
 | 
				
			||||||
		const result = await uploadRequest.promise.catch((err) =>
 | 
								const result = await uploadRequest.promise;
 | 
				
			||||||
			console.log("Upload error!", err),
 | 
								if (result.statusCode === 200) {
 | 
				
			||||||
		);
 | 
					 | 
				
			||||||
		if (response.statusCode == 200) {
 | 
					 | 
				
			||||||
				console.log(`${filepath} Uploaded`);
 | 
									console.log(`${filepath} Uploaded`);
 | 
				
			||||||
			} else {
 | 
								} else {
 | 
				
			||||||
			console.log("SERVER ERROR");
 | 
									console.error("SERVER ERROR");
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
							});
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	async getUploadLink(chunkId: number): Promise<string> {
 | 
						async getUploadLink(chunkId: number): Promise<string> {
 | 
				
			||||||
 | 
							if (this.prefetchedUploadLinks[chunkId]) {
 | 
				
			||||||
 | 
								return this.prefetchedUploadLinks[chunkId];
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return this.requestUploadLink(chunkId);
 | 
							return this.requestUploadLink(chunkId);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	async requestUploadLink(chunkId: number): Promise<string> {
 | 
						async requestUploadLink(chunkId: number): Promise<string> {
 | 
				
			||||||
		console.log(`Requesting ${chunkId}`);
 | 
							console.log(`Requesting upload link for chunk ${chunkId}`);
 | 
				
			||||||
		const result = await this.client.mutate({
 | 
							const result = await this.client.mutate({
 | 
				
			||||||
			mutation: gql.GetUploadLinkDocument,
 | 
								mutation: gql.GetUploadLinkDocument,
 | 
				
			||||||
			variables: { videoId: this.videoId, chunkIndex: chunkId },
 | 
								variables: { videoId: this.videoId, chunkIndex: chunkId },
 | 
				
			||||||
		});
 | 
							});
 | 
				
			||||||
		console.log(JSON.stringify(result.data));
 | 
							this.prefetchedUploadLinks[chunkId] = result.data.getUploadLink.uploadUrl;
 | 
				
			||||||
		return result.data.getUploadLink.uploadUrl;
 | 
							return result.data.getUploadLink.uploadUrl;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -140,8 +164,8 @@ export default function CameraScreen({
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	const device = useCameraDevice("back");
 | 
						const device = useCameraDevice("back");
 | 
				
			||||||
	const format = useCameraFormat(device, [
 | 
						const format = useCameraFormat(device, [
 | 
				
			||||||
		{ videoResolution: { width: 3048, height: 2160 } },
 | 
							{ videoResolution: { width: 1920, height: 1080 } },
 | 
				
			||||||
		{ fps: 60 },
 | 
							{ fps: 30 },
 | 
				
			||||||
	]);
 | 
						]);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// TODO(#60): setOrientation should be called when changes are detected
 | 
						// TODO(#60): setOrientation should be called when changes are detected
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user