indexFlow
fun <T> ElasticsearchAsyncClient.indexFlow(upstream: Flow<Document<T>>, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(100, 250.milliseconds)): Flow<BulkResponseItem>
This function is used to index documents asynchronously into Elasticsearch using bulk requests. It takes a flow of Document
Return
A Flow of BulkResponseItem objects that represent the results of indexing each individual document
Example usage:
val documents = listOf(
Document("index1", "id1", mapOf("field1" to "value1")),
Document("index1", "id2", mapOf("field1" to "value2")),
Document("index2", "id3", mapOf("field1" to "value3"))
).asFlow()
ElasticsearchAsyncClient.indexFlow(documents, concurrency = 3).collect {
when (it) {
is BulkResponseItem.Failure -> println("Indexing failed for item with id ${it.id}: ${it.error}")
is BulkResponseItem.Success -> println("Indexing succeeded for item with id ${it.id}")
}
}
Content copied to clipboard
Parameters
upstream
A Flow of Document
concurrency
Optional parameter that specifies the number of concurrent bulk requests that can be executed at a time. Default value is 1.
groupStrategy
Optional parameter that specifies the ChunkStrategy to be used for splitting the input stream into chunks. Default value is TimeWindow(100, 250.milliseconds).