@Conrad,

Your code should be good to go, I can run it in my local env. There're two
points you may have a check:
1). does the topic have data there, you can confirm with kafka cli '
*bin/kafka-console-consumer.sh*';
2). is the port in bootstrapServers right? By default it's 9092.



On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton <conrad.cramp...@secdata.com
> wrote:

> Hi,
>
> New to the group – ‘hello’!
>
>
>
> Just starting to look into Beam and I very much like the concepts, but
> have rather fallen at the first hurdle – that being trying to subscribe to
> a kafka topic and process results.
>
> Very simply the following code doesn’t get receive any records (the data
> is going into the queue) – I just get nothing.
>
> I have tried on both direct-runner and flink-runner (using the Quickstart
> as a base for options, mvn profile etc.)
>
>
>
> Code
>
>
>
> Pipeline p = Pipeline.*create*(options);
>
> List<String> topics = ImmutableList.*of*(*"test-http-logs-json"*);
>
>
> PCollection<String> logs = p.apply(KafkaIO.<String, String>*read*()
>         .withBootstrapServers(
> *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"*
> )
>         .withTopics(topics)
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .withMaxNumRecords(10)
>         .updateConsumerProperties(ImmutableMap.<String, Object>*builder*()
>                 .put(*"auto.offset.reset"*, (Object) *"earliest"*)
>                 .put(*"group.id <http://group.id>"*, (Object)
> *"http-logs-beam-json"*)
>                 .put(*"enable.auto.commit"*, (Object) *"true"*)
>                 .put(*"receive.buffer.bytes"*, 1024 * 1024)
>                 .build())
>
> *// set a Coder for Key and Value         *.withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *log*.debug(*"{}"*, input.getValue());
>                 *return *input.getKey() + *" " *+ input.getValue();
>             }
>         }));
>
>
> p.run();
>
>
>
>
>
> Result:
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = true
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>        ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
> May 04, 2017 5:02:13 PM 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource
> generateInitialSplits
>
> INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = true
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
> May 04, 2017 5:02:14 PM 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader
> start
>
> INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = false
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
>
>
>
>
> Any suggestions as I have been on this now for a over a day with various
> attempts but nothing comes through.
>
> When connecting to a different topic (which I subscribe directly via Flink
> in another application and get data from), I can set the .put(
> *"auto.offset.reset"*, (Object) * "earliest"*) to earliest and latest and
> see different values for the offset so kafka appears to be available/
> visible etc.
>
>
>
> Many thanks
>
> Conrad
>
>
> SecureData, combating cyber threats
>
> ------------------------------
>
> The information contained in this message or any of its attachments may be
> privileged and confidential and intended for the exclusive use of the
> intended recipient. If you are not the intended recipient any disclosure,
> reproduction, distribution or other dissemination or use of this
> communications is strictly prohibited. The views expressed in this email
> are those of the individual and not necessarily of SecureData Europe Ltd.
> Any prices quoted are only valid if followed up by a formal written quote.
>
> SecureData Europe Limited. Registered in England & Wales 04365896.
> Registered Address: SecureData House, Hermitage Court, Hermitage Lane,
> Maidstone, Kent, ME16 9NT
>



-- 
----
Mingmin

Reply via email to