uploadSplitItems

fun <T> S3AsyncClient.uploadSplitItems(bucket: String, upstream: Flow<T>, splitStrategy: GroupStrategy = GroupStrategy.Count(1000), concurrency: Int = 1, key: (Int) -> String, f: suspend (T) -> ByteArray): Flow<S3Response>

This function uploads a file in chunks to an Amazon S3 bucket using the S3AsyncClient.

Particularly useful for handling streams of unknown size, since it automatically splits the flow into separate files, allowing for seamless processing and storage.

When the split size exceeds 5MB, the function automatically utilizes S3's multipart file upload to prevent retaining large chunks of data in-memory. If it is smaller, the function uses the put object operation for a quicker and more efficient processing.

Return

A flow of S3Response objects for each uploaded chunk.

Example usage:

val s3Client: S3AsyncClient = ...
val bucket = "my-bucket"
val usersFlow = flowOf<User> { ... }

s3Client
.uploadSplitItems(
bucket = bucket,
upstream = usersFlow,
splitEach = TimeWindow(1000, 1.seconds),
key = { part -> "folder/file-part-$part" }
) { user ->
// convert the user into an array of bytes
}
.collect { response ->
println("Uploaded part: ${response.key}")
}

Parameters

bucket

The S3 bucket to upload the file to.

upstream

A flow of bytes representing the file to be uploaded.

splitStrategy

The strategy to describe the split to be uploaded. Default is 1000 items.

concurrency

The number of concurrent uploads to use. Default is 1.

key

A function that takes an integer (part number) and returns the key of the object in the S3 bucket.

f

A function that takes an object and converts it to an array of bytes.