The first two code samples here show different ways of consuming multiple
Kafka partitions,
without really explaining why you would use one or the other.

http://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition

The first uses flatMapMerge:

val done = Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("topic1"))
  .flatMapMerge(maxPartitions, _._2)
  .via(business)
  .batch(max = 20, first =>
CommittableOffsetBatch.empty.updated(first.committableOffset)) {
(batch, elem) =>
    batch.updated(elem.committableOffset)
  }
  .mapAsync(3)(_.commitScaladsl())
  .runWith(Sink.ignore)

and the second runs a per-partition stream producing a future.

//Consumer group represented as Source[(TopicPartition,
Source[Messages])]val consumerGroup =
  Consumer.committablePartitionedSource(consumerSettings,
Subscriptions.topics("topic1"))//Process each assigned partition
separately
consumerGroup.map {
  case (topicPartition, source) =>
    source
      .via(business)
      .toMat(Sink.ignore)(Keep.both)
      .run()}
  .mapAsyncUnordered(maxPartitions)(_._2)
  .runWith(Sink.ignore)

Can anyone say anything about the performance characteristics or other pros
and cons of these approaches? Also, should there be a custom dispatcher for
the futures in the second one?

We're currently doing something like the second, but using mapAsync rather
than emitting the futures (from running the streams-per-partition) into the
stream. And I actually had a bug using a parallelism factor less than the
number of partitions and some partitions didn't get processed. It strikes
me that that in a case like this where the Futures-per-partition only
complete at times like rebalance, that Future is a somewhat confusing
abstraction and the flatMapMerge is somewhat more intuitive. I believe in
example one all messages in all partitions would still be emitted even if I
erroneously picked too low a value for maxPartitions.

Thanks in advance.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to