Re: KafkaIO nothing received?

2017-05-10 Thread Conrad Crampton
Hi Raghu,
I would like to be running 0.9 Kafka servers, but I can confirm that I am 
running 0.8.2 – off the HDP 2.3.0 stack. The confusion lies (I think) with the 
upgrade to HDP has a bug in the way Ambari reports the various versions of the 
stack – specifically Kafka as, so I have been trying to use equivalent 
client version. It is all very odd in that I am using a Nifi processor 
(PublishKafka) which is a v0.9 producer which appears to create messages for 
the topic, but can only use a 0.8 consumer to read from it. Now that I have 
cleared up the fact that I am in fact saddled with a 0.8.2 server I can move on 
an use the appropriate clients – alas however, not Beam for the moment (given 
it is 0.9+ compatible). I know we are 0.8.2 on the server as the version is 
within the jar names for the kafka jars i.e. kafka_2.10-
So until I can get my HDP upgraded to include a later version of Kafka I am 
going to have to use Flink (or Spark etc.) for my current application, but once 
upgraded I will definitely come back to Beam as it appears to be a great 
product with terrific community support.
Thanks again,

From: Raghu Angadi <>
Reply-To: "" <>
Date: Wednesday, 10 May 2017 at 00:20
To: "" <>
Subject: Re: KafkaIO nothing received?

Hi Conrad,

Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a 0.9 
or newer servers. I don't think 0.9 new client can talk to old brokers (but 0.9 
brokers can talk to older clients). How did you confirm the server version? You 
can check the server log. But I might be mistaken about this incompatibility.

Can you post 'jstack' of the application when it is stuck (assuming you are 
running using DirectRunner)?

> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set 
> this using updateConsumerProperties as the value gets discarded.

KafkaIO does not place any restrictions on ConsumerConfig except for key and 
value deserializers. The message about discarding these would be from Kafka 
consumer itself. I think it ignores configuration settings that it does not 
know about and logs 


On Mon, May 8, 2017 at 1:57 AM, Conrad Crampton 
<<>> wrote:
Hi Raghu,
Yeah, the job just waits and does nothing. It reports the correct offset (this 
changes when I use ‘earliest’ or ‘latest’), but nothing is received. There are 
definitely messages in the queue. I am using Beam 0.6.
With my other application using Flink, I am using the FlinkKafkaConsumer08 
libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar 
problem then i.e. no errors reported and appears to work fine, but nothing 
actually received in the streaming job.
Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set this 
using updateConsumerProperties as the value gets discarded.

From: Raghu Angadi <<>>
Reply-To: "<>" 
Date: Thursday, 4 May 2017 at 18:27

To: "<>" 
Subject: Re: KafkaIO nothing received?


It does not look like there is a version incompatibility. You would see errors 
during during initialization otherwise. Log line "INFO: Reader-0: reading from 
test-http-logs-json-0 starting at offset 0" says it Kafka consumer was able to 
connect to servers.

Does the the job just wait inside What is the version of Beam you are 
using? If it is just waiting for records, please ensure the topic has messages 
(using etc). Alternately you can try reading from 
another topic you mentioned that worked fine.


On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton 
<<>> wrote:
Ok, good to know I’m not going totally mad.
I think I may have been running around in circles unnecessarily 
I am using kafka as part of an HDP installation (with Ambari). The Ambari 
interface is reporting my kafka version as and indeed the output 
given previously give
INFO: Kafka version : (which doesn’t make particular sense). So I have 
ssh’d onto the server and looked at the libs for kafka and they are 
kafka_2.10- so I’m guessing something is not quite right. 
This is incredibly frustrating as it looks like I am trying to connect to  v0.9 
kafka but it’s actually v0.8 which clearly is very diff

Re: KafkaIO nothing received?

2017-05-09 Thread Raghu Angadi
Hi Conrad,

Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a
0.9 or newer servers. I don't think 0.9 new client can talk to old brokers
(but 0.9 brokers can talk to older clients). How did you confirm the server
version? You can check the server log. But I might be mistaken about this

Can you post 'jstack' of the application when it is stuck (assuming you are
running using DirectRunner)?

> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t
set this using updateConsumerProperties as the value gets discarded.

KafkaIO does not place any restrictions on ConsumerConfig except for key
and value deserializers. The message about discarding these would be from
Kafka consumer itself. I think it ignores configuration settings that it
does not know about and logs them


On Mon, May 8, 2017 at 1:57 AM, Conrad Crampton <
> wrote:

> Hi Raghu,
> Yeah, the job just waits and does nothing. It reports the correct offset
> (this changes when I use ‘earliest’ or ‘latest’), but nothing is received.
> There are definitely messages in the queue. I am using Beam 0.6.
> With my other application using Flink, I am using the FlinkKafkaConsumer08
> libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar
> problem then i.e. no errors reported and appears to work fine, but nothing
> actually received in the streaming job.
> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set
> this using updateConsumerProperties as the value gets discarded.
> Thanks
> Conrad
> *From: *Raghu Angadi <>
> *Reply-To: *"" <>
> *Date: *Thursday, 4 May 2017 at 18:27
> *To: *"" <>
> *Subject: *Re: KafkaIO nothing received?
> Conrad,
> It does not look like there is a version incompatibility. You would see
> errors during during initialization otherwise. Log line "INFO: Reader-0:
> reading from test-http-logs-json-0 starting at offset 0" says it Kafka
> consumer was able to connect to servers.
> Does the the job just wait inside What is the version of Beam you
> are using? If it is just waiting for records, please ensure the topic has
> messages (using etc). Alternately you can try
> reading from another topic you mentioned that worked fine.
> Raghu.
> On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton <
>> wrote:
> Hi,
> Ok, good to know I’m not going totally mad.
> I think I may have been running around in circles unnecessarily 
> I am using kafka as part of an HDP installation (with Ambari). The Ambari
> interface is reporting my kafka version as and indeed the output
> given previously give
> INFO: Kafka version : (which doesn’t make particular sense). So I
> have ssh’d onto the server and looked at the libs for kafka and they are
> kafka_2.10- so I’m guessing something is not quite
> right. This is incredibly frustrating as it looks like I am trying to
> connect to  v0.9 kafka but it’s actually v0.8 which clearly is very
> different wrt/ zookeeper etc. This is also backup up by trying the
> (and all the other tools) ask for mandatory
> zookeeper options which shouldn’t be necessary as far as I understand it
> for v0.9.
> I am currently looking at
> 63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/
> examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/
> to see if I can use this code somehow to use Beam
> with Kafka v0.8. I am really hoping to as I have no option to upgrade
> currently and I really like the abstraction of Beam.
> Thanks
> Conrad
> *From: *Mingmin Xu <>
> *Reply-To: *"" <>
> *Date: *Thursday, 4 May 2017 at 17:59
> *To: *"" <>
> *Subject: *Re: KafkaIO nothing received?
> @Conrad,
> Your code should be good to go, I can run it in my local env. There're two
> points you may have a check:
> 1). does the topic have data there, you can confirm with kafka cli '
> *bin/*';
> 2). is the port in bootstrapServers right? By de

Re: KafkaIO nothing received?

2017-05-04 Thread Mingmin Xu

Your code should be good to go, I can run it in my local env. There're two
points you may have a check:
1). does the topic have data there, you can confirm with kafka cli '
2). is the port in bootstrapServers right? By default it's 9092.

On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton  wrote:

> Hi,
> New to the group – ‘hello’!
> Just starting to look into Beam and I very much like the concepts, but
> have rather fallen at the first hurdle – that being trying to subscribe to
> a kafka topic and process results.
> Very simply the following code doesn’t get receive any records (the data
> is going into the queue) – I just get nothing.
> I have tried on both direct-runner and flink-runner (using the Quickstart
> as a base for options, mvn profile etc.)
> Code
> Pipeline p = Pipeline.*create*(options);
> List topics = ImmutableList.*of*(*"test-http-logs-json"*);
> PCollection logs = p.apply(KafkaIO.*read*()
> .withBootstrapServers(
> *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"*
> )
> .withTopics(topics)
> .withKeyCoder(StringUtf8Coder.*of*())
> .withValueCoder(StringUtf8Coder.*of*())
> .withMaxNumRecords(10)
> .updateConsumerProperties(ImmutableMap.*builder*()
> .put(*"auto.offset.reset"*, (Object) *"earliest"*)
> .put(*" "*, (Object)
> *"http-logs-beam-json"*)
> .put(*""*, (Object) *"true"*)
> .put(*"receive.buffer.bytes"*, 1024 * 1024)
> .build())
> *// set a Coder for Key and Value *.withoutMetadata())
> .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction String>, String>() {
> @Override
> *public *String apply(KV input) {
> *log*.debug(*"{}"*, input.getValue());
> *return *input.getKey() + *" " *+ input.getValue();
> }
> }));
> Result:
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
> INFO: ConsumerConfig values:
> metric.reporters = []
> = 30
> value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
> = http-logs-beam-json
> partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
> = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
> = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> = true
> ssl.key.password = null
> = 500
> sasl.kerberos.min.time.before.relogin = 6
> = 54
> ssl.truststore.password = null
> = 3
> metrics.num.samples = 2
> =
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
> ssl.protocol = TLS
> check.crcs = true
> = 4
>ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> = 3000
> = 5000
> receive.buffer.bytes = 1048576
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = PLAINTEXT
> ssl.truststore.location = null
> ssl.keystore.password = null
> ssl.keymanager.algorithm = SunX509
> = 3
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> INFO: Kafka version :
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> INFO: Kafka commitId : 23c69d62a0cabf06
> May 04, 2017 5:02:13 PM 
> generateInitialSplits
> INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
> INFO: ConsumerConfig