I see - yes that is a limitation. You could get around that by doing
something similar - i.e., use reflection to create the MyDeciderThingy
object. Or you could check in the partition call if the
MyDeciderThingy object is null and if so create it.

Better yet, you can try using the new producer if that is an option
for you since partitioning happens outside.

On Thu, Oct 30, 2014 at 11:46:44AM -0700, Rajiv Kurian wrote:
> Yes I am using the old producer. When I use the producer I do something
> like this:
> 
> Properties config = new Properties();
> 
> // put other stuff in the config.
> 
> // Put the partitioner class that kafka will instantiate.
> 
> config.put("partitioner.class", partitionerClass.getName());
> 
> return new Producer<K, M>(new ProducerConfig(config));
> 
> So I assume that the kafka code uses reflection to instantiate the
> partitioner object. My partitioner object looks like this:
> 
> public class MyPartitioner implements Partitioner<MyStuff> {
> 
>   MyDeciderThingy thingy;
> 
>     public RawBusIdPartitioner(VerifiableProperties props) {}
> 
> 
>     @Override
> 
>     public int partition(MyStuff stuff, int numPartitions) {
> 
>         return thingy.decide(stuff);
> 
>     }
> 
> }
> 
> The problem is I  don't know how to pass a MyDeciderThingy to my
> Partitioner object given Kafka instantiates it.
> 
> Thanks!
> 
> On Thu, Oct 30, 2014 at 11:34 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> 
> > Not sure I follow - you just need to extend the Partitioner trait. You
> > don't _have_ to use that specific constructor.
> >
> > It is slightly different with the new producer, but looks like you are
> > on the old producer.
> >
> > On Thu, Oct 30, 2014 at 11:16:28AM -0700, Rajiv Kurian wrote:
> > > Actually I figured out what the problem was. My producer was using a
> > > partitioner which was causing a null pointer exception. This actually
> > > raises another question for me. I want some state in my partitioner and
> > the
> > > only constructor that Kafka seems to use is this one:
> > >
> > > public MyPartitioner(VerifiableProperties props) {}
> > >
> > > How do I inject an object here that I can use to decide what partition my
> > > messages should go to? As an ugly hack I am using a static variable from
> > > somewhere else.
> > >
> > > On Thu, Oct 30, 2014 at 10:25 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> > >
> > > > Yes I see a ton of WARN messages on the broker logs of this form:
> > > >
> > > > 2014-10-30T17:21:54.281Z WARN  [kafka-request-handler-6            ]
> > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > LeaderAndIsr request with correlation id 158 from controller 0 epoch
> > 29083
> > > > with an older leader epoch 5 for partition [myTopic, 56], current
> > leader
> > > > epoch is 5
> > > >
> > > > 2014-10-30T17:21:54.282Z WARN  [kafka-request-handler-6            ]
> > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > LeaderAndIsr request with correlation id 158 from controller 0 epoch
> > 29083
> > > > with an older leader epoch 5 for partition myTopic,385], current leader
> > > > epoch is 5
> > > >
> > > > 2014-10-30T17:21:54.283Z WARN  [kafka-request-handler-6            ]
> > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > LeaderAndIsr request with correlation id 158 from controller 0 epoch
> > 29083
> > > > with an older leader epoch 5 for partition [myTopic,684], current
> > leader
> > > > epoch is 5
> > > >
> > > > 2014-10-30T17:21:54.283Z WARN  [kafka-request-handler-6            ]
> > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > LeaderAndIsr request with correlation id 158 from controller 0 epoch
> > 29083
> > > > with an older leader epoch 5 for partition [myTopic,1002], current
> > leader
> > > > epoch is
> > > >
> > > > On Thu, Oct 30, 2014 at 10:03 AM, Joel Koshy <jjkosh...@gmail.com>
> > wrote:
> > > >
> > > >> Do you see any errors on the broker logs? Can you check the broker's
> > > >> public access logs and see if there are topic metadata requests coming
> > > >> in from the producer?
> > > >>
> > > >> On Wed, Oct 29, 2014 at 07:15:15PM -0700, Rajiv Kurian wrote:
> > > >> > I don't see anything else that is relevant. I traced the first of
> > these
> > > >> > error messages to figure out the ordering. It actually goes
> > something
> > > >> like
> > > >> > this:
> > > >> >
> > > >> > 2014-10-30T01:51:32.400Z ERROR [ProducerSendThread-                ]
> > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages by
> > > >> > topic, partition due to: null
> > > >> >
> > > >> > 2014-10-30T01:51:34.082Z ERROR [ProducerSendThread-                ]
> > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages by
> > > >> > topic, partition due to: null
> > > >> >
> > > >> > 2014-10-30T01:51:34.422Z ERROR [ProducerSendThread-                ]
> > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages by
> > > >> > topic, partition due to: null
> > > >> >
> > > >> > 2014-10-30T01:51:34.664Z ERROR [ProducerSendThread-                ]
> > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages by
> > > >> > topic, partition due to: null
> > > >> >
> > > >> > 2014-10-30T01:51:34.902Z ERROR [ProducerSendThread-                ]
> > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send requests
> > for
> > > >> > topics myTopic with correlation ids in [0,8]
> > > >> >
> > > >> > 2014-10-30T01:51:35.007Z ERROR [ProducerSendThread-                ]
> > > >> > [k.producer.async.ProducerSendThread ] {}: Error in handling batch
> > of 1
> > > >> > events
> > > >> >
> > > >> > kafka.common.FailedToSendMessageException: Failed to send messages
> > > >> after 3
> > > >> > tries.
> > > >> >
> > > >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
> > > >> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> > > >> >
> > > >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> > Source)
> > > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> >
> > > >> > at
> > > >> >
> > > >>
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> >
> > > >> > at
> > > >> >
> > > >>
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> >
> > > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > > >> > [scala-library-2.10.1.jar:na]
> > > >> >
> > > >> > at kafka.producer.async.ProducerSendThread.processEvents(Unknown
> > Source)
> > > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> >
> > > >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> > > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> >
> > > >> > Thanks!
> > > >> >
> > > >> > On Wed, Oct 29, 2014 at 7:02 PM, Rajiv Kurian <ra...@signalfuse.com
> > >
> > > >> wrote:
> > > >> >
> > > >> > > This pattern seems to repeat:
> > > >> > >
> > > >> > > 2014-10-30T01:54:46.004Z ERROR [ProducerSendThread-
> >   ]
> > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to send
> > requests for
> > > >> > > topics myTopic with correlation ids in [1729,1736]
> > > >> > >
> > > >> > > 2014-10-30T01:54:46.008Z ERROR [ProducerSendThread-
> >   ]
> > > >> > > [k.producer.async.ProducerSendThread ] {}: Error in handling
> > batch of
> > > >> 4
> > > >> > > events
> > > >> > >
> > > >> > > kafka.common.FailedToSendMessageException: Failed to send messages
> > > >> after 3
> > > >> > > tries.
> > > >> > >
> > > >> > > at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
> > > >> > > ~[kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >
> > > >> > > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> > Source)
> > > >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >
> > > >> > > at
> > > >> > >
> > > >>
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >
> > > >> > > at
> > > >> > >
> > > >>
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >
> > > >> > > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > > >> > > [scala-library-2.10.1.jar:na]
> > > >> > >
> > > >> > > at kafka.producer.async.ProducerSendThread.processEvents(Unknown
> > > >> Source)
> > > >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >
> > > >> > > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> > > >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >
> > > >> > > 2014-10-30T01:54:46.025Z ERROR [ProducerSendThread-
> >   ]
> > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages
> > > >> by
> > > >> > > topic, partition due to: null
> > > >> > >
> > > >> > > 2014-10-30T01:54:46.174Z ERROR [ProducerSendThread-
> >   ]
> > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages
> > > >> by
> > > >> > > topic, partition due to: null
> > > >> > >
> > > >> > > 2014-10-30T01:54:46.356Z ERROR [ProducerSendThread-
> >   ]
> > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages
> > > >> by
> > > >> > > topic, partition due to: null
> > > >> > >
> > > >> > > 2014-10-30T01:54:46.644Z ERROR [ProducerSendThread-
> >   ]
> > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > messages
> > > >> by
> > > >> > > topic, partition due to: null
> > > >> > >
> > > >> > > On Wed, Oct 29, 2014 at 6:57 PM, Jun Rao <jun...@gmail.com>
> > wrote:
> > > >> > >
> > > >> > >> The log before that will show you the cause of the error. Could
> > you
> > > >> dig
> > > >> > >> that out?
> > > >> > >>
> > > >> > >> Thanks,
> > > >> > >>
> > > >> > >> Jun
> > > >> > >>
> > > >> > >> On Wed, Oct 29, 2014 at 6:43 PM, Rajiv Kurian <
> > ra...@signalfuse.com>
> > > >> > >> wrote:
> > > >> > >>
> > > >> > >> > I keep seeing these errors in my code that is just trying to
> > send
> > > >> some
> > > >> > >> data
> > > >> > >> > using an AsyncProducer:
> > > >> > >> >
> > > >> > >> > kafka.common.FailedToSendMessageException: Failed to send
> > messages
> > > >> > >> after 3
> > > >> > >> > tries.
> > > >> > >> >
> > > >> > >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown
> > Source)
> > > >> > >> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >> >
> > > >> > >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> > > >> Source)
> > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >> >
> > > >> > >> > at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >>
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >> >
> > > >> > >> > at
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >>
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >> >
> > > >> > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > > >> > >> > [scala-library-2.10.1.jar:na]
> > > >> > >> >
> > > >> > >> > at
> > kafka.producer.async.ProducerSendThread.processEvents(Unknown
> > > >> Source)
> > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >> >
> > > >> > >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > >> > >> >
> > > >> > >> > 2014-10-30T01:40:45.176Z ERROR [ProducerSendThread-
> > > >> ]
> > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > >> messages by
> > > >> > >> > topic, partition due to: null
> > > >> > >> >
> > > >> > >> > 2014-10-30T01:40:45.506Z ERROR [ProducerSendThread-
> > > >> ]
> > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > >> messages by
> > > >> > >> > topic, partition due to: null
> > > >> > >> >
> > > >> > >> > 2014-10-30T01:40:45.647Z ERROR [ProducerSendThread-
> > > >> ]
> > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > >> messages by
> > > >> > >> > topic, partition due to: null
> > > >> > >> >
> > > >> > >> > 2014-10-30T01:40:45.772Z ERROR [ProducerSendThread-
> > > >> ]
> > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > >> messages by
> > > >> > >> > topic, partition due to: null
> > > >> > >> >
> > > >> > >> > 2014-10-30T01:40:45.890Z ERROR [ProducerSendThread-
> > > >> ]
> > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send
> > requests
> > > >> for
> > > >> > >> > topics myTopic with correlation ids in [169,176]
> > > >> > >> >
> > > >> > >> > 2014-10-30T01:40:45.892Z ERROR [ProducerSendThread-
> > > >> ]
> > > >> > >> > [k.producer.async.ProducerSendThread ] {}: Error in handling
> > batch
> > > >> of 29
> > > >> > >> > events
> > > >> > >> >
> > > >> > >> >
> > > >> > >> > I created the topic before starting using bin/kafka-topics.sh.
> > I
> > > >> checked
> > > >> > >> > zookeeper and seems like the topic was indeed created. Any
> > ideas?
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> > >
> > > >>
> >

Reply via email to