Thanks, Gonzalo that def helped! This also ties into an issue I¹d raised with mesos-kafka where the zk path seemed to be ignored, and I now see that there is a node that stores the mesos-kafka scheduler config, and the kafka path must be specified separately, so is currently /.
Still not reading events, but definitely looks better in startup log: 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do started. 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: kafka-source-test: Successfully registered new MBean. 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: kafka-source-test started 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to mesos01:31000,mesos02:31000,mesos08:31000 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s) Set(home_views) 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for producing 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1454702137389] Added fetcher for partitions ArrayBuffer([[home_views,0], initOffset -1 to broker id:0,host:mesos01,port:31000] ) $ curl http://mesos04:34545/metrics | json_pp % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 925 0 925 0 0 7741 0 --:--:-- --:--:-- --:--:-- 7773 { "CHANNEL.hdfs-channel-kafka" : { "ChannelCapacity" : "10", "StartTime" : "1454702136681", "EventTakeSuccessCount" : "0", "ChannelFillPercentage" : "0.0", "EventPutAttemptCount" : "0", "EventTakeAttemptCount" : "14", "StopTime" : "0", "ChannelSize" : "0", "EventPutSuccessCount" : "0", "Type" : "CHANNEL" }, "SOURCE.kafka-source-test" : { "AppendBatchReceivedCount" : "0", "AppendAcceptedCount" : "0", "KafkaEmptyCount" : "0", "AppendReceivedCount" : "0", "KafkaEventGetTimer" : "18046", "EventAcceptedCount" : "0", "StartTime" : "1454702138033", "StopTime" : "0", "KafkaCommitTimer" : "0", "Type" : "SOURCE", "AppendBatchAcceptedCount" : "0", "EventReceivedCount" : "0", "OpenConnectionCount" : "0" }, "SINK.hdfs-sink-kafka" : { "ConnectionCreatedCount" : "0", "EventDrainAttemptCount" : "0", "BatchCompleteCount" : "0", "StartTime" : "1454702136714", "Type" : "SINK", "EventDrainSuccessCount" : "0", "StopTime" : "0", "BatchUnderflowCount" : "0", "ConnectionFailedCount" : "0", "BatchEmptyCount" : "13", "ConnectionClosedCount" : "0" } } From: Gonzalo Herreros <[email protected]> Reply-To: <[email protected]> Date: Thursday, February 4, 2016 at 11:15 PM To: user <[email protected]> Subject: Re: KafkaSource not picking up any messages I'm concerned with the warning "no brokers found when trying to rebalance" Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and it's not the standard /kafka When you connect with the kafka-console-consumer, do you specify /mesos-kafka or just zk01:2181? You can use the zkclient tool to check if there are brokers currently registered under that path for the topic "test" Regards, Gonzalo On 4 February 2016 at 21:16, Justin Ryan <[email protected]> wrote: > Hiya folks, > > I¹m setting up a new environment with Kafka, Flume, and HDFS, and have > implemented the simplest possible testing configuration I can come up with. > It logs successfully configuring and starting the KafkaSource, and with kafka > tools I can confirm that messages have been sent, but the JSON Metrics from > Flume show 0 messages processed. > > Are there any more tools at my disposal to investigate? Any assistance would > be greatly appreciated! > > My config and log: > > > # generated by Chef for mesos10, changes will be overwritten > > flume1.sources=kafka-source-test > flume1.channels=hdfs-channel-kafka > flume1.sinks=hdfs-sink-kafka > flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSourc> e > flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka > flume1.sources.kafka-source-test.topic=test > flume1.sources.kafka-source-test.groupId=flume > flume1.sources.kafka-source-test.interceptors=i1 > flume1.sources.kafka-source-test.interceptors.i1.type=timestamp > flume1.sources.kafka-source-test.consumer.timeout.ms > <http://flume1.sources.kafka-source-test.consumer.timeout.ms> =100 > flume1.sources.kafka-source-test.channels=hdfs-channel-kafka > flume1.channels.hdfs-channel-kafka.type=memory > flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka > flume1.sinks.hdfs-sink-kafka.type=hdfs > flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d > flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5 > flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0 > flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0 > flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream > flume1.channels.hdfs-channel-kafka.capacity=10 > flume1.channels.hdfs-channel-kafka.transactionCapacity=10 > > > Startup log (less incredibly long path lines): > > 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: > Configuration provider starting > 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: > Reloading configuration file:/etc/flume/conf.chef/flume.conf > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka > Agent: flume1 > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka > 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume > configuration contains configuration for agents: [flume1] > 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels > 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of > channel hdfs-channel-kafka type memory > 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel > hdfs-channel-kafka > 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of > source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource > 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={ > parameters:{interceptors.i1.type=timestamp, > zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka, > groupId=flume, consumer.timeout.ms <http://consumer.timeout.ms> =100, > topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} } > 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink: > hdfs-sink-kafka, type: hdfs > 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel > hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka] > 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{ > sourceRunners:{kafka-source-test=PollableSourceRunner: { > source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state: > IDLE} counterGroup:{ name:null counters:{} } }} > sinkRunners:{hdfs-sink-kafka=SinkRunner: { > policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{ > name:null counters:{} } }} > channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name: > hdfs-channel-kafka}} } > 16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully > registered new MBean. > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: > CHANNEL, name: hdfs-channel-kafka started > 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka > 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test > 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting > org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}..> . > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered > new MBean. > 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: > SINK, name: hdfs-sink-kafka started > 16/02/04 11:32:07 INFO mortbay.log: Logging to > org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via > org.mortbay.log.Slf4jLog > 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4 > 16/02/04 11:32:07 INFO mortbay.log: Started > [email protected]:34545 > <http://[email protected]:34545> > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property auto.commit.enable > is overridden to false > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property > consumer.timeout.ms <http://consumer.timeout.ms> is overridden to 10 > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id > <http://group.id> is overridden to flume > 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect > is overridden to zk01:2181/mesos-kafka > 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at > zk01:2181/mesos-kafka > 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event thread. > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name > <http://host.name> =mesos10 > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.version=1.8.0_72-internal > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.vendor=Oracle Corporation > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.io.tmpdir=/tmp > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:java.compiler=<NA> > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name > <http://os.name> =Linux > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64 > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:os.version=3.13.0-63-generic > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name > <http://user.name> =marathon > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client > environment:user.home=/opt/marathon > 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection, > connectString=zk01:2181/mesos-kafka sessionTimeout=6000 > watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98 > 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to > server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> > . Will not attempt to authenticate using SASL (unknown error) > 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to > 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> , > initiating session > 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete on > server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> > , sessionid = 0x152858b1cc07491, negotiated timeout = 6000 > 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed > (SyncConnected) > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], begin registering consumer > flume_mesos10-1454614328204-ca8a74df in ZK > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], end registering consumer > flume_mesos10-1454614328204-ca8a74df in ZK > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for > consumer flume_mesos10-1454614328204-ca8a74df > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer > flume_mesos10-1454614328204-ca8a74df try #0 > 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to > rebalance. > 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: > [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer > flume_mesos10-1454614328204-ca8a74df try #0 > 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do > started. > 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored > counter group for type: SOURCE, name: kafka-source-test: Successfully > registered new MBean. > 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type: > SOURCE, name: kafka-source-test started > -- > > -- > Justin Alan Ryan > Sr. Systems / Release Engineer > ZipRealty
