I'd probably prefer to keep it the way it is, unless it's becoming more
like the function without the messageHandler argument.

Right now I have code like this, but I wish it were more similar looking:

    if (parsed.partitions.isEmpty()) {
      JavaPairInputDStream<String, MessageWrapper> kvstream = KafkaUtils
          .createDirectStream(jssc, String.class, MessageWrapper.class,
StringDecoder.class,
              MessageDecoder.class, kafkaArgs(parsed), topicSet);
      requests = kvstream.map((Function<Tuple2<String, MessageWrapper>,
MessageWrapper>) Tuple2::_2);
    } else {
      requests = KafkaUtils.createDirectStream(jssc, String.class,
          MessageWrapper.class, StringDecoder.class, MessageDecoder.class,
MessageWrapper.class,
          kafkaArgs(parsed), parsed.partitions,
(Function<MessageAndMetadata<String, MessageWrapper>,
              MessageWrapper>) MessageAndMetadata::message);
    }

Of course, this is in the Java API so it may not have relevance to what
you're talking about.

Perhaps if both functions (the one with partitions arg and the one without)
returned just ConsumerRecord, I would like that more.

- Alan

On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, looks like you'd have to catch them in the serializer and have the
> serializer return option or something. The new consumer builds a buffer
> full of records, not one at a time.
> On Mar 8, 2016 4:43 AM, "Marius Soutier" <mps....@gmail.com> wrote:
>
>>
>> > On 04.03.2016, at 22:39, Cody Koeninger <c...@koeninger.org> wrote:
>> >
>> > The only other valid use of messageHandler that I can think of is
>> > catching serialization problems on a per-message basis.  But with the
>> > new Kafka consumer library, that doesn't seem feasible anyway, and
>> > could be handled with a custom (de)serializer.
>>
>> What do you mean, that doesn't seem feasible? You mean when using a
>> custom deserializer? Right now I'm catching serialization problems in the
>> message handler, after your proposed change I'd catch them in `map()`.
>>
>>

Reply via email to