Joe, So I am using the 2.8.2 build of the kafka jar, using that latest beta1-candidate1 tag.
The code above should be all you need to reproduce the issue. I'll create a JIRA ticket. Thanks, Jason On Sun, Jun 16, 2013 at 8:32 PM, Joe Stein <crypt...@gmail.com> wrote: > I have been running tests on 2.9.2 and 2.8.2 without any issues off the > beta1-candidate1 release tag > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=refs/tags/0.8.0-beta1 > > Binaries will be built from the source file available from download here : > http://people.apache.org/~joestein/kafka-0.8.0-beta1-candidate1/ > > It could be that test failing is interacting with a local file already > there and causing issues or such type of thing not reproducible maybe > without failing the test first to cause the data not cleaned up or > something, dunno. > > If you have an issue can you put the steps to reproduce it together or > whatever amount of detailed information you can provide and then please > open up a JIRA ticket https://issues.apache.org/jira/browse/KAFKA > > Thanks! > > > On Sun, Jun 16, 2013 at 11:14 PM, Jason Rosenberg <j...@squareup.com> > wrote: > > > Yep, > > > > The configs are good. And my apps are working fine too. It seems only > to > > be an issue with this test (and a few others like it). > > > > Jason > > > > > > On Sun, Jun 16, 2013 at 3:02 PM, Eric Sites <eric.si...@threattrack.com > > >wrote: > > > > > Jason, > > > > > > > > > Did you update your config file with the new name of the zookeeper > > > settings: > > > > > > It was renamed from zk.connect to zookeeper.connect. > > > > > > You should check all of the settings because other setting names have > > > changed as well. > > > > > > Cheers, > > > Eric Sites > > > > > > On 6/16/13 5:14 PM, "Jason Rosenberg" <j...@squareup.com> wrote: > > > > > > >I've started having problems with the latest version of the 0.8 > branch. > > > > The test below has started failing. It was working fine with a prior > > > >version of 0.8, going back to Apr 30 > > > >(sha 988d4d8e65a14390abd748318a64e281e4a37c19). > > > > > > > >I haven't figured out when exactly it started failing, but I saw it > > with a > > > >version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and > now > > > >also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4). > > > > > > > >The test code is essentially this (assume a zk server is running with > > > >'zkConnect', and a kafka broker running with a metadata port at > 'port': > > > > > > > > Properties pProps = new Properties(); > > > > pProps.put("metadata.broker.list", "localhost:" + port); > > > > pProps.put("serializer.class", "kafka.serializer.StringEncoder"); > > > > ProducerConfig pConfig = new ProducerConfig(pProps); > > > > Producer<Integer, String> producer = new Producer<Integer, > > > >String>(pConfig); > > > > KeyedMessage<Integer, String> data = > > > > new KeyedMessage<Integer, String>("test-topic", > "test-message"); > > > > producer.send(data); > > > > producer.close(); > > > > > > > > Properties cProps = new Properties(); > > > > cProps.put("zookeeper.connect", zkConnect); > > > > cProps.put("group.id", "group1"); > > > > ConsumerConfig consumerConfig = new ConsumerConfig(cProps); > > > > ConsumerConnector consumerConnector = > > > >Consumer.createJavaConsumerConnector(consumerConfig); > > > > > > > > Map<String, List<KafkaStream<byte[], byte[]>>> > topicMessageStreams = > > > > > > > >consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", > > 1)); > > > > List<KafkaStream<byte[], byte[]>> streams = > > > >topicMessageStreams.get("test-topic"); > > > > final KafkaStream<byte[], byte[]> stream = streams.get(0); > > > > final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); > > > > > > > > // run in a separate thread > > > > final AtomicBoolean success = new AtomicBoolean(false); > > > > Thread consumeThread = new Thread(new Runnable() { > > > > public void run() { > > > > while (iter.hasNext()) { > > > > byte[] msg = iter.next().message(); > > > > String msgStr = new String(msg); > > > > success.set(msgStr.equals("test-message")); > > > > break; > > > > } > > > > } > > > > }); > > > > > > > > consumeThread.start(); > > > > // this now hangs with the latest code > > > > consumeThread.join(); > > > > > > > > consumerConnector.shutdown(); > > > > assertTrue(success.get()); > > > > > > > >The output looks like this: > > > > > > > >912 [main] WARN kafka.producer.BrokerPartitionInfo - Error while > > fetching > > > >metadata [{TopicMetadata for topic test-topic -> > > > >No partition metadata for topic test-topic due to > > > >kafka.common.LeaderNotAvailableException}] for topic [test-topic]: > class > > > >kafka.common.LeaderNotAvailableException > > > >922 [main] WARN kafka.producer.BrokerPartitionInfo - Error while > > fetching > > > >metadata [{TopicMetadata for topic test-topic -> > > > >No partition metadata for topic test-topic due to > > > >kafka.common.LeaderNotAvailableException}] for topic [test-topic]: > class > > > >kafka.common.LeaderNotAvailableException > > > >923 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to > > > >collate messages by topic, partition due to: Failed to fetch topic > > > >metadata > > > >for topic: test-topic > > > >980 [kafka-request-handler-2] WARN > kafka.server.HighwaterMarkCheckpoint > > - > > > >No highwatermark file is found. Returning 0 as the highwatermark for > > > >partition [test-topic,0] > > > > > > > >The consumer never receives a message, and so the test hangs.... > > > > > > > >This test worked fine as I said with an older version of the branch, > but > > > >it > > > >would output exceptions about LeaderNotAvailable, etc... > > > > > > > >Thoughts? > > > > > > > >Jason > > > > > > > > > > > > -- > > /* > Joe Stein > http://www.linkedin.com/in/charmalloc > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > */ >