My preliminary testing suggests that the flatMapMerge version will *not*
work if the breadth value is less than maxPartitions.
I don't understand why all partition sources wouldn't continue to emit and
be merged.

On Mon, Sep 11, 2017 at 4:27 PM, Richard Rodseth <rrods...@gmail.com> wrote:

> The first two code samples here show different ways of consuming multiple
> Kafka partitions,
> without really explaining why you would use one or the other.
>
> http://doc.akka.io/docs/akka-stream-kafka/current/consumer.
> html#source-per-partition
>
> The first uses flatMapMerge:
>
> val done = Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))
>   .flatMapMerge(maxPartitions, _._2)
>   .via(business)
>   .batch(max = 20, first => 
> CommittableOffsetBatch.empty.updated(first.committableOffset)) { (batch, 
> elem) =>
>     batch.updated(elem.committableOffset)
>   }
>   .mapAsync(3)(_.commitScaladsl())
>   .runWith(Sink.ignore)
>
> and the second runs a per-partition stream producing a future.
>
> //Consumer group represented as Source[(TopicPartition, Source[Messages])]val 
> consumerGroup =
>   Consumer.committablePartitionedSource(consumerSettings, 
> Subscriptions.topics("topic1"))//Process each assigned partition separately
> consumerGroup.map {
>   case (topicPartition, source) =>
>     source
>       .via(business)
>       .toMat(Sink.ignore)(Keep.both)
>       .run()}
>   .mapAsyncUnordered(maxPartitions)(_._2)
>   .runWith(Sink.ignore)
>
> Can anyone say anything about the performance characteristics or other
> pros and cons of these approaches? Also, should there be a custom
> dispatcher for the futures in the second one?
>
> We're currently doing something like the second, but using mapAsync rather
> than emitting the futures (from running the streams-per-partition) into the
> stream. And I actually had a bug using a parallelism factor less than the
> number of partitions and some partitions didn't get processed. It strikes
> me that that in a case like this where the Futures-per-partition only
> complete at times like rebalance, that Future is a somewhat confusing
> abstraction and the flatMapMerge is somewhat more intuitive. I believe in
> example one all messages in all partitions would still be emitted even if I
> erroneously picked too low a value for maxPartitions.
>
> Thanks in advance.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to