acknowledgmentMessageFlow

fun <Error class: unknown class>.acknowledgmentMessageFlow(queue: SqsQueue, upstream: Flow<MessageAcknowledgment<Acknowledgment>>, concurrency: Int = 1, groupStrategy: GroupStrategy = GroupStrategy.TimeWindow(10, 250.milliseconds)): Flow<MessageAcknowledgmentResult>

Creates a flow that processes acknowledgments for messages in an Amazon Simple Queue Service (SQS) queue.

This function takes an upstream flow of MessageAcknowledgment objects and processes them based on their acknowledgment type: ChangeMessageVisibility, Delete, or Ignore. The function processes acknowledgments concurrently using concurrency and the specified groupStrategy.

Return

A Flow of MessageAcknowledgmentResult objects that contain the message, acknowledgment, and response.

Example usage:

val sqsClient: SqsClient = ...
val messageAcknowledgments = flowOf(
MessageAcknowledgment(Message(...), Delete),
MessageAcknowledgment(Message(...), ChangeMessageVisibility(30)),
MessageAcknowledgment(Message(...), Ignore)
)

sqsClient
.acknowledgmentMessageFlow(SqsQueue.name("myqueue"), messageAcknowledgments)
.collect { result ->
println(
"""Processed message:
${result.message.messageId()},
acknowledgment: ${result.acknowledgment},
response: ${result.response}"""
)
}

Parameters

queue

The reference of the queue.

upstream

A Flow of MessageAcknowledgment objects.

concurrency

The level of concurrency for processing messages.

groupStrategy

The strategy to use when chunking messages for processing.