Skip to main content

Upload Session

This series of endpoints under /upload-session are used to upload files larger than 100mb to the Subworkflow service.

How it works

The Subworkflow API uses what is commonly known as a multipart upload flow to achieve the upload. This is where you are expected to break the original large file into smaller parts and then upload them separately. Once all parts are uploaded, they are combined on our end to make up the original document. This whole process takes a bit longer than the normal upload approach but ensures it's possible to upload files of up to 5gb.

Of course, users are not expected to perform the required steps manually because it can be easily scripted! Subworkflow's SDKs comes with this support builtin but a leaner javascript example is included below. No/low-code platform users may struggle a little with this flow but we intend to provide first-class integrations to make larger uploads easier (also if you have an urgent request, please feel free to reach out on our Discord!).

  • Subworkflow Official JS/TS SDK (recommended, coming soon)
  • See below for exmaple code

Good to know

  • Only available to users on the Standard and Enterprise Plan and only intended for files above 100mb.
  • Uses a custom multipart upload API which is similar but not compatible with s3 multipart upload. This means you can't use aws s3 sdks unfortunately.
  • Minimum part size is 5mb except the last part which can be smaller.Maximum part size is 100mb though this is not recommended.
  • Higher part and concurrency settings are only recommended for users with strong internet/network bandwidth.
  • If an upload session fails or errors, it's typically best to just start a fresh upload session rather than trying to recover an old one.

Benchmarks

Upload speeds can vary based on location and internet bandwidth. Users should expect roughly the following durations and set timeouts accordingly if required.

scenariofilesizepage counttotal approx time to upload
Legal document160mb177440 - 50 seconds

Roadmap

We're always looking at ways to make it easier to get large files into Subworkflow. The following are active tickets we're working towards.

  • Google Drive Integration
  • OneDrive/Sharepoint Integration
  • Directly importing from an Object Store ie. S3 Bucket

Have a specific request or suggestion? Please share it on our Discord

Example code

The following examples demonstrate how to use the /v1/upload-session endpoints to upload a large file to the Subworkflow Service.

Typescript

import * as fs from 'fs';
import { MultipartUploader } from './MultipartUploader';

const uploader = new MultipartUploader({
baseUrl: 'https://api.subworkflow.ai/v1',
apiKey: '<MY-API-KEY>',
});

try {
const fileBuffer = fs.readFileSync('/path/to/file.pdf');
const params = {
fileName: 'myfile', // name of file
fileExt: 'pdf', // file extension
fileType: 'application/pdf', // mimetype of file
jobType: 'extract', // the job to perform after upload is successful
};

await uploader.start(params);
await uploader.append(fileBuffer);
const result = await uploader.end();

console.log('Resulting job:', result);
} catch (error) {
console.error(error);
if (uploader.getKey()) await uploader.abort();
}
import {
UploadSessionAbortResponse,
UploadSessionAppendResponse,
UploadSessionEndResponse,
UploadSessionPart,
UploadSessionStartRequest,
UploadSessionStartResponse } from "./types";

import pLimit from 'p-limit';

class MultipartUploader {
private key: string | null = null;
private uploadedParts: UploadSessionPart[] = [];

constructor(
private readonly opts: {
baseUrl?: string;
apiKey?: string;
chunkSize?: number;
concurrency?: number;
}
) {
this.opts.baseUrl = opts.baseUrl?.endsWith('/') ? opts.baseUrl.slice(0, -1) : opts.baseUrl;
if (!this.opts.apiKey) throw new Error('API key must be set.');
if (!this.opts.chunkSize) this.opts.chunkSize = 1024 * 1024 * 10;
if (!this.opts.concurrency) this.opts.concurrency = 4;
}

public getKey(): string | null {
return this.key;
}

async $post<R>(endpoint: string, body: FormData) {
const res = await fetch(`${this.opts.baseUrl}${endpoint}`, {
method: 'POST',
headers: { 'x-api-key': this.opts.apiKey! },
body,
});

if (!res.ok) {
const message = `${endpoint} ${res.status} ${await res.text()}`;
console.log(message);
throw new Error(message);
}
const data = await res.json();
return data as R;
}

public async start(params: UploadSessionStartRequest) {
this.uploadedParts = [];
console.log(`Starting session for ${params.fileName} ...`);

const formData = new FormData();
formData.append('fileName', params.fileName);
formData.append('fileExt', params.fileExt);
formData.append('fileType', params.fileType);
formData.append('jobType', params.jobType);
if (params.expiryInDays) formData.append('expiryInDays', String(params.expiryInDays));

const response = await this.$post<UploadSessionStartResponse>(`/upload_session/start`,formData);
if (!response.data?.key) throw new Error(`Expected response to contain 'key' but none found.`);

this.key = response.data.key;
}

private async sendChunk(
partNumber: number,
file: Blob
): Promise<void> {
if (!this.key) throw new Error('Upload session not started. Call start() first.');

const formData = new FormData();
formData.append('key', this.key);
formData.append('partNumber', String(partNumber));
formData.append('file', file, `${this.key}_${partNumber}`);

const response = await this.$post<UploadSessionAppendResponse>(`/upload_session/append`,formData);

const part = response.data;
if (!part?.etag || !part?.partNumber) throw new Error("Append session response missing 'etag' or 'partNumber'.");
this.uploadedParts.push(part);
}

public async append(data: Blob | Buffer | ArrayBuffer): Promise<void> {

const limiter = pLimit(this.opts.concurrency ?? 4);

const totalSize = (data as any).size ?? (data as any).byteLength;
if (typeof totalSize === 'undefined') {
throw new Error('Unsupported data type for appendNonStreaming. Must be Blob, Buffer, or ArrayBuffer.');
}

const numChunks = Math.ceil(totalSize / this.opts.chunkSize!);

const jobs = [];
let offset = 0;
for (let i = 0; i < numChunks; i++) {
const partNumber = i + 1;
let chunk: Blob;
if (data instanceof Blob) {
chunk = data.slice(offset, offset + this.opts.chunkSize!);
} else if (data instanceof Buffer) {
const bufferChunk = (data as Buffer).subarray(offset, offset + this.opts.chunkSize!);
chunk = new Blob([new Uint8Array(bufferChunk)]);
} else if (data instanceof ArrayBuffer) {
const arrayBufferChunk = (data as ArrayBuffer).slice(offset, offset + this.opts.chunkSize!);
chunk = new Blob([new Uint8Array(arrayBufferChunk)]);
} else {
throw new Error('Unsupported data type for chunking. Must be Blob, Buffer, or ArrayBuffer.');
}
await new Promise(res => setTimeout(res,100));
jobs.push(limiter(() => this.sendChunk(partNumber, chunk)));
offset += chunk.size;
}
await Promise.all(jobs);
}

public async end(): Promise<UploadSessionEndResponse> {
if (!this.key) throw new Error('No active session to end.');

const sortedParts = this.uploadedParts.sort((a, b) => a.partNumber - b.partNumber);

console.log('Finalizing session...');

const formData = new FormData();
formData.append('key', this.key);
formData.append('parts', JSON.stringify(sortedParts))

const response = await this.$post<UploadSessionEndResponse>(`/upload_session/end`, formData);

this.key = null;
console.log('Upload complete.');
return response;
}

public async abort(): Promise<void> {
if (!this.key) console.warn('Attempted to abort, but no active session found.');

console.log(`Aborting session ${this.key}...`);

const formData = new FormData();
formData.append('key', this.key!);

await this.$post<UploadSessionAbortResponse>(`/upload_session/abort`, formData);

console.log('Session aborted successfully.');
this.key = null;
this.uploadedParts = [];
}
}

export { MultipartUploader };