@Conrad, Your code should be good to go, I can run it in my local env. There're two points you may have a check: 1). does the topic have data there, you can confirm with kafka cli ' *bin/kafka-console-consumer.sh*'; 2). is the port in bootstrapServers right? By default it's 9092.
On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton <conrad.cramp...@secdata.com > wrote: > Hi, > > New to the group – ‘hello’! > > > > Just starting to look into Beam and I very much like the concepts, but > have rather fallen at the first hurdle – that being trying to subscribe to > a kafka topic and process results. > > Very simply the following code doesn’t get receive any records (the data > is going into the queue) – I just get nothing. > > I have tried on both direct-runner and flink-runner (using the Quickstart > as a base for options, mvn profile etc.) > > > > Code > > > > Pipeline p = Pipeline.*create*(options); > > List<String> topics = ImmutableList.*of*(*"test-http-logs-json"*); > > > PCollection<String> logs = p.apply(KafkaIO.<String, String>*read*() > .withBootstrapServers( > *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"* > ) > .withTopics(topics) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .withMaxNumRecords(10) > .updateConsumerProperties(ImmutableMap.<String, Object>*builder*() > .put(*"auto.offset.reset"*, (Object) *"earliest"*) > .put(*"group.id <http://group.id>"*, (Object) > *"http-logs-beam-json"*) > .put(*"enable.auto.commit"*, (Object) *"true"*) > .put(*"receive.buffer.bytes"*, 1024 * 1024) > .build()) > > *// set a Coder for Key and Value *.withoutMetadata()) > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *log*.debug(*"{}"*, input.getValue()); > *return *input.getKey() + *" " *+ input.getValue(); > } > })); > > > p.run(); > > > > > > Result: > > May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig > logAll > > INFO: ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > group.id = http-logs-beam-json > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, > datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = true > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 1048576 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka version : 0.9.0.1 > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka commitId : 23c69d62a0cabf06 > > May 04, 2017 5:02:13 PM > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource > generateInitialSplits > > INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0 > > May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig > logAll > > INFO: ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > group.id = http-logs-beam-json > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, > datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = true > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 1048576 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka version : 0.9.0.1 > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka commitId : 23c69d62a0cabf06 > > May 04, 2017 5:02:14 PM > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader > start > > INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0 > > May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig > logAll > > INFO: ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, > datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = false > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 1048576 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka version : 0.9.0.1 > > May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka commitId : 23c69d62a0cabf06 > > > > > > Any suggestions as I have been on this now for a over a day with various > attempts but nothing comes through. > > When connecting to a different topic (which I subscribe directly via Flink > in another application and get data from), I can set the .put( > *"auto.offset.reset"*, (Object) * "earliest"*) to earliest and latest and > see different values for the offset so kafka appears to be available/ > visible etc. > > > > Many thanks > > Conrad > > > SecureData, combating cyber threats > > ------------------------------ > > The information contained in this message or any of its attachments may be > privileged and confidential and intended for the exclusive use of the > intended recipient. If you are not the intended recipient any disclosure, > reproduction, distribution or other dissemination or use of this > communications is strictly prohibited. The views expressed in this email > are those of the individual and not necessarily of SecureData Europe Ltd. > Any prices quoted are only valid if followed up by a formal written quote. > > SecureData Europe Limited. Registered in England & Wales 04365896. > Registered Address: SecureData House, Hermitage Court, Hermitage Lane, > Maidstone, Kent, ME16 9NT > -- ---- Mingmin