Thanks, could you attach your patch to the jira? Jun
On Wed, Jun 5, 2013 at 9:37 AM, Andrew Milkowski <[email protected]> wrote: > Jun, thanks for the response > > 1. new jira https://issues.apache.org/jira/browse/KAFKA-933 > 2. locally I patched DataGenerator > > public void run() throws Exception { > > List<KeyedMessage<Integer, String>> list = new > ArrayList<KeyedMessage<Integer, String>>(); > for (int i = 0; i < 50; i++) { > Long timestamp = RANDOM.nextLong(); > if (timestamp < 0) timestamp = -timestamp; > String messageStr = timestamp.toString(); > log.info(" creating message: " + messageStr); > list.add(new KeyedMessage<Integer, String>(topic, null, messageStr)); > } > > log.info(" send " + list.size() + " " + topic + " count events to " + > uri); > producer.send(list); > producer.close(); > generateOffsets(); > } > > I have a separate request, having to do with upgrading kafka hadoop api's, > will follow up in the separte email to @users > > thanks! > > > On Thu, May 30, 2013 at 11:50 PM, Jun Rao <[email protected]> wrote: > > > This seems to be a bug. We should send the message string, instead the > > Message object in DataGenerator. Could you file a jira? > > > > Thanks, > > > > Jun > > > > > > On Thu, May 30, 2013 at 1:47 PM, Andrew Milkowski <[email protected] > > >wrote: > > > > > Hi, > > > > > > Working of git master codebase > > > > > > and following instructions at > > > > > > > > > https://github.com/apache/kafka/blob/trunk/contrib/hadoop-consumer/README > > > > > > https://github.com/apache/kafka > > > > > > when running > > > > > > ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties > > > > > > an exception is thrown > > > > > > Exception in thread "main" java.lang.ClassCastException: > > > kafka.message.Message cannot be cast to [B > > > at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:129) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:124) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > at > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > > > at > > > > > > > > > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > > > at > > > > > > > > > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:233) > > > at > > > > > > > > > scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:615) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:124) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54) > > > at kafka.producer.Producer.send(Producer.scala:74) > > > at kafka.javaapi.producer.Producer.send(Producer.scala:41) > > > > > > please advice, thank you! > > > > > >
