Package-level declarations

Types

Link copied to clipboard
interface AsyncSemaphore<P>

An interface representing a non-blocking semaphore.

Link copied to clipboard

Simple wrapper around a channel that provides a context for receiving items in a more flexible manner.

Link copied to clipboard
data class ConcurrencyInfo(val maximum: Int, val current: Int)
Link copied to clipboard
class ConcurrencyStrategy(val initial: ConcurrencyInfo, val increaseStrategy: (ConcurrencyInfo) -> ConcurrencyInfo = { it })

A class representing a concurrency strategy for handling concurrent operations.

Link copied to clipboard
interface GroupStrategy

A sealed interface representing different strategies for grouping items in a Flow. The GroupStrategy can be used in various contexts to control the way items are grouped together, optimizing the trade-off between latency and throughput.

Link copied to clipboard
interface ObjectPool<T>
Link copied to clipboard
interface StoppableFlowCollector<T> : FlowCollector<T>
Link copied to clipboard

Properties

Link copied to clipboard
val <T> suspend () -> T.async: Deferred<T>

Extension property on a suspend function that allows it to be called asynchronously. The result of the suspend function is wrapped in a Deferred object.

Link copied to clipboard
val <T> suspend () -> T.lazyAsync: Deferred<T>

Extension property on a suspend function that allows it to be called asynchronously. The result of the suspend function is wrapped in a Deferred object and is computed only upon the first invocation of Deferred.await or Deferred.join.

Functions

Link copied to clipboard
fun <E, S> Flow<E>.alsoTo(bufferCapacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: (E) -> Unit? = null, flow: Flow<E>.() -> Flow<S>): Flow<Pair<E, S>>

Allows the Flow to be collected and transformed into another Flow concurrently. The transformed Flow is collected asynchronously in the provided coroutineScope. The original flow and the transformed flow share the same buffer with the specified bufferCapacity, onBufferOverflow policy, and onUndeliveredElement handler.

Link copied to clipboard
fun Flow<Byte>.asByteArray(groupStrategy: GroupStrategy = GroupStrategy.Count(8)): Flow<ByteArray>

Converts the Flow of Byte to a Flow of ByteArray.

fun Flow<ByteBuffer>.asByteArray(): Flow<ByteArray>

Converts the Flow of ByteBuffer to a Flow of ByteArray.

fun Flow<String>.asByteArray(charset: Charset = Charset.defaultCharset()): Flow<ByteArray>

Converts the Flow of String to a Flow of ByteArray using the specified charset.

Link copied to clipboard
fun Flow<ByteArray>.asByteBuffer(): Flow<ByteBuffer>

Converts the Flow of ByteArray to a Flow of ByteBuffer.

fun Flow<String>.asByteBuffer(charset: Charset = Charset.defaultCharset()): Flow<ByteBuffer>

Converts the Flow of String to a Flow of ByteBuffer using the specified charset.

Link copied to clipboard
fun Flow<String>.asBytes(charset: Charset = Charset.defaultCharset()): Flow<Byte>

Converts the Flow of String to a Flow of Byte using the specified charset.

Link copied to clipboard
fun Flow<ByteArray>.asString(): Flow<String>

Converts the Flow of ByteArray to a Flow of String.

fun Flow<Byte>.asString(groupStrategy: GroupStrategy = GroupStrategy.Count(8)): Flow<String>

Converts the Flow of Byte to a Flow of String.

Link copied to clipboard
fun <T> Flow<T>.catchAndEmitLast(f: FlowCollector<T>.(Throwable) -> T): Flow<T>

Catches exceptions that occur while collecting the flow and emits the result of the provided function f with the caught exception as a parameter.

Link copied to clipboard
fun <T> Flow<T>.chunked(strategy: GroupStrategy): Flow<List<T>>

The chunked function is used to group the elements emitted by the current Flow into chunks based on the provided GroupStrategy. This can be useful for batch processing or aggregating streams of data more efficiently by combining them into larger pieces.

fun <T> Flow<T>.chunked(size: Int): Flow<List<T>>

The chunked function is used to group the elements emitted by the current Flow into fixed-size chunks based on the number of elements. This can be useful for batch processing or aggregating streams of data more efficiently by combining them into larger pieces.

fun <T> Flow<T>.chunked(size: Int, duration: Duration): Flow<List<T>>

The chunked function is used to group the elements emitted by the current Flow into fixed-size chunks based on a time window. This can be useful for batch processing or aggregating streams of data more efficiently by combining them into larger, more manageable pieces.

Link copied to clipboard
suspend fun <T> Flow<T>.collectAsReceiver(block: suspend ChannelReceiverContext<T>.() -> Unit)
suspend fun <T> Flow<T>.collectAsReceiver(scope: CoroutineScope = CoroutineScope(Dispatchers.Default), block: suspend ChannelReceiverContext<T>.() -> Unit)

Collects the items emitted by this Flow into a ChannelReceiverContext and applies the given block to it.

Link copied to clipboard
suspend fun <T> Flow<T>.collectAsync(concurrency: Int, block: suspend (T) -> Unit)

Collects all the values emitted by the Flow, applying the block function to each value. The processing of items in the flow is concurrent and limited by concurrency level.

suspend fun <T, P> Flow<T>.collectAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, block: suspend (T) -> Unit)

Collects all the values emitted by the Flow, applying the block function to each value. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

Link copied to clipboard
suspend fun <T> Flow<T>.collectCatching(collector: FlowCollector<T> = FlowCollector { }): Result<Unit>

The collectCatching function collects the elements emitted by the current Flow in a suspending way and returns a Result instance that represents the result of the operation.

Link copied to clipboard
suspend fun <T> Flow<T>.collectWithTimeout(duration: Duration, collector: FlowCollector<T> = FlowCollector { })

Collects the flow with a specified timeout duration. If the flow takes longer than the provided duration to complete, it throws a TimeoutCancellationException.

Link copied to clipboard
infix fun <T> List<CompletableDeferred<T>>.completeAll(values: List<T>): Boolean

Completes all the CompletableDeferred objects in the list with the corresponding values from the values list.

Link copied to clipboard
infix fun <T> List<CompletableDeferred<T>>.completeAllWith(result: Result<List<T>>): Boolean

Completes all the CompletableDeferred objects in the list with the values from the result if it is successful or completes them exceptionally with the error from the result if it is a failure.

Link copied to clipboard
fun <E> CoroutineScope.consume(capacity: Int = Channel.RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: (E) -> Unit? = null, consumer: suspend ReceiveChannel<E>.() -> Unit): Channel<E>

Creates a Channel with the specified capacity, buffer overflow behavior, and undelivered element handler, and then consumes elements from this channel using the given consumer function.

Link copied to clipboard
suspend fun <T> Flow<T>.countOnWindow(duration: Duration): Int

Counts the number of items emitted by the flow within the specified duration window.

Link copied to clipboard
@ExperimentalCoroutinesApi
fun <T> Flow<T>.earlyCompleteIf(stopPredicate: suspend (T) -> Boolean): Flow<T>

Completes the Flow early if the specified stopPredicate is met for any element.

Link copied to clipboard
fun <T, R> Flow<T>.flatMapFlow(mapper: suspend (T) -> Flow<R>): Flow<R>

Flattens a Flow of T items into a Flow of individual items R.

Link copied to clipboard
fun <T, R> Flow<T>.flatMapIterable(mapper: suspend (T) -> Iterable<R>): Flow<R>

Flattens a Flow of Iterable items into a Flow of individual items.

Link copied to clipboard
suspend fun <T, R> Iterable<T>.flatMapIterableAsync(transform: suspend (T) -> Iterable<R>): List<R>

Transforms the elements of the iterable concurrently using the provided transform function, and then flattens the result.

suspend fun <T, R> Iterable<T>.flatMapIterableAsync(concurrency: Int, transform: suspend (T) -> Iterable<R>): List<R>

Transforms the elements of the iterable concurrently using the provided transform function with a specified concurrency limit, and then flattens the result.

fun <T, R> Flow<T>.flatMapIterableAsync(concurrency: Int, transform: suspend (T) -> Iterable<R>): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The processing of items in the flow is concurrent and limited by concurrency level. Each transformed value is then flattened and emitted individually.

fun <T, R, P> Flow<T>.flatMapIterableAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, transform: suspend (T) -> Iterable<R>): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function. Each transformed value is then flattened and emitted individually.

Link copied to clipboard

Flattens a Flow of List of ByteBuffer and converts it to a Flow of ByteArray.

Link copied to clipboard
fun <T> Flow<Flow<T>>.flattenFlow(): Flow<T>

Flattens a Flow of Flow items into a Flow of individual items.

Link copied to clipboard
fun <T> Flow<Iterable<T>>.flattenIterable(): Flow<T>

Flattens a Flow of Iterable items into a Flow of individual items.

Link copied to clipboard
fun <T> flowOfSuspend(item: suspend () -> T): Flow<T>

Creates a Flow that emits a single item, which is the result of invoking the provided suspending function item.

Link copied to clipboard
fun <T> indefinitelyRepeat(item: T): Flow<T>

Creates an infinite Flow that repeatedly emits the provided item.

Link copied to clipboard
fun <T> Flow<T>.intersperse(between: T, start: T? = null, end: T? = null): Flow<T>

Intersperses the elements of the Flow with the specified start, between, and end elements.

Link copied to clipboard
suspend fun <T> Flow<T>.joinToString(f: suspend (T) -> String = { it.toString() }): String

Joins the elements of the Flow into a single string using the provided f function to transform each element to a string.

suspend fun <T> Flow<T>.joinToString(between: String, f: suspend (T) -> String): String

Joins the elements of the Flow into a single string, separated by the specified between string, using the provided f function to transform each element to a string.

suspend fun <T> Flow<T>.joinToString(between: String, start: String, end: String, f: suspend (T) -> String): String

Joins the elements of the Flow into a single string, enclosed by the specified start and end strings, and separated by the specified between string. Uses the provided f function to transform each element to a string.

Link copied to clipboard
fun <T> Flow<T>.launch(): Job
Link copied to clipboard
fun <T> Flow<T>.launchCollect(collector: FlowCollector<T> = FlowCollector {}): Job
Link copied to clipboard
fun <T> CoroutineScope.lazyAsync(f: suspend () -> T): Lazy<Deferred<T>>

Creates a lazily started coroutine which runs the provided suspend function. The result of the suspend function is wrapped in a Deferred object and is computed only upon the first invocation of Deferred.await or Deferred.join.

Link copied to clipboard
fun Flow<String>.lines(): Flow<String>

Splits and buffers the Flow of String into lines, emitting each line as a separate element in the resulting Flow.

Link copied to clipboard
suspend fun <T, R> Iterable<T>.mapAsync(transform: suspend (T) -> R): List<R>

Transforms the elements of the iterable concurrently using the provided transform function.

suspend fun <T, R> Iterable<T>.mapAsync(concurrency: Int, transform: suspend (T) -> R): List<R>

Transforms the elements of the iterable concurrently using the provided transform function with a specified concurrency limit.

fun <T, R> Flow<T>.mapAsync(concurrency: Int, transform: suspend (T) -> R): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The processing of items in the flow is concurrent and limited by concurrency level.

fun <T, R, P> Flow<T>.mapAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, transform: suspend (T) -> R): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

Link copied to clipboard
fun <T> objectPool(maxSize: Int, maxDuration: Duration = 5.minutes, onClose: suspend (T) -> Unit = {}, factory: suspend () -> T): ObjectPool<T>
suspend fun <T> objectPool(maxSize: Int, initialSize: Int = 1, maxDuration: Duration = 5.minutes, onClose: suspend (T) -> Unit = {}, factory: suspend () -> T): ObjectPool<T>
Link copied to clipboard
fun <T> Flow<T>.onEachAsync(concurrency: Int, block: suspend (T) -> Unit): Flow<T>

Applies the given function block to each value of the original Flow and reemits them downstream. The processing of items in the flow is concurrent and limited by concurrency level.

fun <T, P> Flow<T>.onEachAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, block: suspend (T) -> Unit): Flow<T>

Applies the given function block to each value of the original Flow and reemits them downstream. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

Link copied to clipboard
operator fun <T, R : T> Flow<T>.plus(other: Flow<R>): Flow<T>

Combines two Flows of the same base type T into a single Flow by concatenating their elements.

Link copied to clipboard
fun <T> poll(concurrency: ConcurrencyStrategy = ConcurrencyStrategy.disabled, stopOnEmptyList: Boolean = false, interval: Duration? = null, f: suspend ConcurrencyInfo.() -> List<T>): Flow<T>

Creates a flow that continuously polls elements concurrently by successively applying the f function. The flow stops based on the stopOnEmptyList parameter or when the coroutine context is no longer active. Concurrency is controlled by the concurrency strategy.

Link copied to clipboard
fun <T, S> pollWithState(initial: S, interval: Duration? = null, shouldStop: (S) -> Boolean = { false }, f: suspend (S) -> Pair<S, List<T>>): Flow<T>

Creates a flow that continuously polls elements using the f function, starting with an initial state initial.

Link copied to clipboard
fun <T> promiseFlow(f: suspend (CompletableDeferred<Flow<T>>) -> Unit): Flow<T>
Link copied to clipboard
fun <T> Flow<T>.split(strategy: GroupStrategy): Flow<Flow<T>>

The split function is used to group the elements emitted by the current Flow into smaller Flows based on the provided GroupStrategy. This function is useful when smaller Flows are preferred over the original one, some scenarios include file processing, database operations, and so on.

fun <T> Flow<T>.split(size: Int): Flow<Flow<T>>

The split function is used to group the elements emitted by the current Flow into smaller Flows based a max number of items. This function is useful when smaller Flows are preferred over the original one, some scenarios include file processing, database operations, and so on.

fun <T> Flow<T>.split(size: Int, duration: Duration): Flow<Flow<T>>

The split function is used to group the elements emitted by the current Flow into smaller Flows based on size and timeout duration. This function is useful when smaller Flows are preferred over the original one, some scenarios include file processing, database operations, and so on.

Link copied to clipboard
fun Flow<String>.splitEvery(delimiter: String): Flow<String>

Splits and buffers the Flow of String based on the provided delimiter, emitting each piece as a separate element in the resulting Flow.

Link copied to clipboard
@ExperimentalCoroutinesApi
fun <T> stoppableFlow(block: suspend StoppableFlowCollector<T>.() -> Unit): Flow<T>

Creates a Flow with the ability to stop its collection early based on a custom condition.

Link copied to clipboard
suspend fun Flow<Byte>.sum(): Long

Sums the elements of this Flow of Byte and returns the result.

suspend fun Flow<Double>.sum(): Double

Sums the elements of this Flow of Double and returns the result.

suspend fun Flow<Float>.sum(): Double

Sums the elements of this Flow of Float and returns the result.

suspend fun Flow<Int>.sum(): Long

Sums the elements of this Flow of Int and returns the result.

suspend fun Flow<Long>.sum(): Long

Sums the elements of this Flow of Long and returns the result.

suspend fun Flow<Short>.sum(): Long

Sums the elements of this Flow of Short and returns the result.

Link copied to clipboard
fun <T, P> Flow<T>.throttle(strategy: ThrottleStrategy = ThrottleStrategy.Suspend, semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>): Flow<T>

Throttles the emission of elements from the Flow based on the specified semaphore, interval, and strategy.

fun <T> Flow<T>.throttle(elementsPerInterval: Int, interval: Duration, strategy: ThrottleStrategy = ThrottleStrategy.Suspend): Flow<T>

Throttles the emission of elements from the Flow based on the specified elementsPerInterval, interval, and strategy.

Link copied to clipboard
fun CoroutineScope.tick(intervalDuration: Duration, f: suspend () -> Unit): Job
Link copied to clipboard
suspend fun <T> Flow<T>.toChannel(channel: Channel<T>)

This function collects the items emitted by the Flow and sends each one to the given channel.

Link copied to clipboard
suspend fun <T> Flow<T>.toList(size: Int): List<T>

Collects the specified number of items size from the Flow into a List.

suspend fun <T> Flow<T>.toList(duration: Duration): List<T>

Collects the items from the Flow into a List within the given duration.

suspend fun <T> Flow<T>.toList(size: Int, duration: Duration): List<T>

Collects the specified number of items size from the Flow into a List within the given duration.

Link copied to clipboard
fun unboundedLongFlow(startAt: Long = 0): Flow<Long>

Creates an (almost) infinite Flow that emits sequentially incremented Long numbers starting from the startAt parameter.

Link copied to clipboard
fun <T, R> Flow<T>.unorderedFlatMapIterableAsync(concurrency: Int, transform: suspend (T) -> Iterable<R>): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The processing of items in the flow is concurrent and limited by concurrency level. Each transformed value is then flattened and emitted individually.

fun <T, R, P> Flow<T>.unorderedFlatMapIterableAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, transform: suspend (T) -> Iterable<R>): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function. Each transformed value is then flattened and emitted individually.

Link copied to clipboard
fun <T, R> Flow<T>.unorderedMapAsync(concurrency: Int, transform: suspend (T) -> R): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The processing of items in the flow is concurrent and limited by concurrency level.

fun <T, R, P> Flow<T>.unorderedMapAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, transform: suspend (T) -> R): Flow<R>

Applies the given transformation function transform to each value of the original Flow and emits the results. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

Link copied to clipboard
fun <T> Flow<T>.unorderedOnEachAsync(concurrency: Int, block: suspend (T) -> Unit): Flow<T>

Applies the given function block to each value of the original Flow and reemits them downstream. The processing of items in the flow is concurrent and limited by concurrency level.

fun <T, P> Flow<T>.unorderedOnEachAsync(semaphore: suspend CoroutineScope.() -> AsyncSemaphore<P>, block: suspend (T) -> Unit): Flow<T>

Applies the given function block to each value of the original Flow and reemits them downstream. The concurrency is managed by an AsyncSemaphore created by the semaphore suspending function.

Link copied to clipboard
suspend fun <T> withPromise(f: suspend (CompletableDeferred<T>) -> Unit): T
Link copied to clipboard
suspend fun <T> CoroutineScope.withPromise(f: suspend (CompletableDeferred<T>) -> Unit): T
Link copied to clipboard
suspend fun <T1, T2, R> zipAsync(first: suspend () -> T1, second: suspend () -> T2, zip: suspend (T1, T2) -> R): R

Function that takes two suspend functions and a zip function as parameters. It executes the suspend functions asynchronously and then applies the zip function to their results.

suspend fun <T1, T2, T3, R> zipAsync(first: suspend () -> T1, second: suspend () -> T2, third: suspend () -> T3, zip: suspend (T1, T2, T3) -> R): R

Function that takes three suspend functions and a zip function as parameters. It executes the suspend functions asynchronously and then applies the zip function to their results.

suspend fun <T1, T2, T3, T4, R> zipAsync(first: suspend () -> T1, second: suspend () -> T2, third: suspend () -> T3, fourth: suspend () -> T4, zip: suspend (T1, T2, T3, T4) -> R): R

Function that takes four suspend functions and a zip function as parameters. It executes the suspend functions asynchronously and then applies the zip function to their results.

suspend fun <T1, T2, T3, T4, T5, R> zipAsync(first: suspend () -> T1, second: suspend () -> T2, third: suspend () -> T3, fourth: suspend () -> T4, fifth: suspend () -> T5, zip: suspend (T1, T2, T3, T4, T5) -> R): R

Function that takes five suspend functions and a zip function as parameters. It executes the suspend functions asynchronously and then applies the zip function to their results.

suspend fun <T1, T2, T3, T4, T5, T6, R> zipAsync(first: suspend () -> T1, second: suspend () -> T2, third: suspend () -> T3, fourth: suspend () -> T4, fifth: suspend () -> T5, sixth: suspend () -> T6, zip: suspend (T1, T2, T3, T4, T5, T6) -> R): R

Function that takes six suspend functions and a zip function as parameters. It executes the suspend functions asynchronously and then applies the zip function to their results.

suspend fun <T1, T2, T3, T4, T5, T6, T7, R> zipAsync(first: suspend () -> T1, second: suspend () -> T2, third: suspend () -> T3, fourth: suspend () -> T4, fifth: suspend () -> T5, sixth: suspend () -> T6, seventh: suspend () -> T7, zip: suspend (T1, T2, T3, T4, T5, T6, T7) -> R): R

Function that takes seven suspend functions and a zip function as parameters. It executes the suspend functions asynchronously and then applies the zip function to their results.