Joe,

So I am using the 2.8.2 build of the kafka jar, using that latest
beta1-candidate1 tag.

The code above should be all you need to reproduce the issue.  I'll create
a JIRA ticket.

Thanks,

Jason


On Sun, Jun 16, 2013 at 8:32 PM, Joe Stein <crypt...@gmail.com> wrote:

> I have been running tests on 2.9.2 and 2.8.2 without any issues off the
> beta1-candidate1 release tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=refs/tags/0.8.0-beta1
>
> Binaries will be built from the source file available from download here :
> http://people.apache.org/~joestein/kafka-0.8.0-beta1-candidate1/
>
> It could be that test failing is interacting with a local file already
> there and causing issues or such type of thing not reproducible maybe
> without failing the test first to cause the data not cleaned up or
> something, dunno.
>
> If you have an issue can you put the steps to reproduce it together or
> whatever amount of detailed information you can provide and then please
> open up a JIRA ticket https://issues.apache.org/jira/browse/KAFKA
>
> Thanks!
>
>
> On Sun, Jun 16, 2013 at 11:14 PM, Jason Rosenberg <j...@squareup.com>
> wrote:
>
> > Yep,
> >
> > The configs are good.  And my apps are working fine too.  It seems only
> to
> > be an issue with this test (and a few others like it).
> >
> > Jason
> >
> >
> > On Sun, Jun 16, 2013 at 3:02 PM, Eric Sites <eric.si...@threattrack.com
> > >wrote:
> >
> > > Jason,
> > >
> > >
> > > Did you update your config file with the new name of the zookeeper
> > > settings:
> > >
> > > It was renamed from zk.connect to zookeeper.connect.
> > >
> > > You should check all of the settings because other setting names have
> > > changed as well.
> > >
> > > Cheers,
> > > Eric Sites
> > >
> > > On 6/16/13 5:14 PM, "Jason Rosenberg" <j...@squareup.com> wrote:
> > >
> > > >I've started having problems with the latest version of the 0.8
> branch.
> > > > The test below has started failing.  It was working fine with a prior
> > > >version of 0.8, going back to Apr 30
> > > >(sha 988d4d8e65a14390abd748318a64e281e4a37c19).
> > > >
> > > >I haven't figured out when exactly it started failing, but I saw it
> > with a
> > > >version on Jun 9 (sha ddb7947c05583ea317e8f994f07b83bf6d5213c3) and
> now
> > > >also with the latest (sha 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4).
> > > >
> > > >The test code is essentially this (assume a zk server is running with
> > > >'zkConnect', and a kafka broker running with a metadata port at
> 'port':
> > > >
> > > >    Properties pProps = new Properties();
> > > >    pProps.put("metadata.broker.list", "localhost:" + port);
> > > >    pProps.put("serializer.class", "kafka.serializer.StringEncoder");
> > > >    ProducerConfig pConfig = new ProducerConfig(pProps);
> > > >    Producer<Integer, String> producer = new Producer<Integer,
> > > >String>(pConfig);
> > > >    KeyedMessage<Integer, String> data =
> > > >        new KeyedMessage<Integer, String>("test-topic",
> "test-message");
> > > >    producer.send(data);
> > > >    producer.close();
> > > >
> > > >    Properties cProps = new Properties();
> > > >    cProps.put("zookeeper.connect", zkConnect);
> > > >    cProps.put("group.id", "group1");
> > > >    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
> > > >    ConsumerConnector consumerConnector =
> > > >Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > >    Map<String, List<KafkaStream<byte[], byte[]>>>
> topicMessageStreams =
> > > >
> > > >consumerConnector.createMessageStreams(ImmutableMap.of("test-topic",
> > 1));
> > > >    List<KafkaStream<byte[], byte[]>> streams =
> > > >topicMessageStreams.get("test-topic");
> > > >    final KafkaStream<byte[], byte[]> stream = streams.get(0);
> > > >    final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
> > > >
> > > >    // run in a separate thread
> > > >    final AtomicBoolean success = new AtomicBoolean(false);
> > > >    Thread consumeThread = new Thread(new Runnable() {
> > > >      public void run() {
> > > >        while (iter.hasNext()) {
> > > >          byte[] msg = iter.next().message();
> > > >          String msgStr = new String(msg);
> > > >          success.set(msgStr.equals("test-message"));
> > > >          break;
> > > >        }
> > > >      }
> > > >    });
> > > >
> > > >    consumeThread.start();
> > > >    // this now hangs with the latest code
> > > >    consumeThread.join();
> > > >
> > > >    consumerConnector.shutdown();
> > > >    assertTrue(success.get());
> > > >
> > > >The output looks like this:
> > > >
> > > >912 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while
> > fetching
> > > >metadata [{TopicMetadata for topic test-topic ->
> > > >No partition metadata for topic test-topic due to
> > > >kafka.common.LeaderNotAvailableException}] for topic [test-topic]:
> class
> > > >kafka.common.LeaderNotAvailableException
> > > >922 [main] WARN kafka.producer.BrokerPartitionInfo  - Error while
> > fetching
> > > >metadata [{TopicMetadata for topic test-topic ->
> > > >No partition metadata for topic test-topic due to
> > > >kafka.common.LeaderNotAvailableException}] for topic [test-topic]:
> class
> > > >kafka.common.LeaderNotAvailableException
> > > >923 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > > >collate messages by topic, partition due to: Failed to fetch topic
> > > >metadata
> > > >for topic: test-topic
> > > >980 [kafka-request-handler-2] WARN
> kafka.server.HighwaterMarkCheckpoint
> >  -
> > > >No highwatermark file is found. Returning 0 as the highwatermark for
> > > >partition [test-topic,0]
> > > >
> > > >The consumer never receives a message, and so the test hangs....
> > > >
> > > >This test worked fine as I said with an older version of the branch,
> but
> > > >it
> > > >would output exceptions about LeaderNotAvailable, etc...
> > > >
> > > >Thoughts?
> > > >
> > > >Jason
> > >
> > >
> >
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>

Reply via email to