Hi,
New to the group – ‘hello’!
Just starting to look into Beam and I very much like the concepts, but have
rather fallen at the first hurdle – that being trying to subscribe to a kafka
topic and process results.
Very simply the following code doesn’t get receive any records (the data is
going into the queue) – I just get nothing.
I have tried on both direct-runner and flink-runner (using the Quickstart as a
base for options, mvn profile etc.)
Code
Pipeline p = Pipeline.create(options);
List<String> topics = ImmutableList.of("test-http-logs-json");
PCollection<String> logs = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667")
.withTopics(topics)
.withKeyCoder(StringUtf8Coder.of())
.withValueCoder(StringUtf8Coder.of())
.withMaxNumRecords(10)
.updateConsumerProperties(ImmutableMap.<String, Object>builder()
.put("auto.offset.reset", (Object) "earliest")
.put("group.id", (Object) "http-logs-beam-json")
.put("enable.auto.commit", (Object) "true")
.put("receive.buffer.bytes", 1024 * 1024)
.build())
// set a Coder for Key and Value
.withoutMetadata())
.apply("Transform ", MapElements.via(new SimpleFunction<KV<String,
String>, String>() {
@Override
public String apply(KV<String, String> input) {
log.debug("{}", input.getValue());
return input.getKey() + " " + input.getValue();
}
}));
p.run();
Result:
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = http-logs-beam-json
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 1048576
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:13 PM
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource generateInitialSplits
INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = http-logs-beam-json
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 1048576
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:14 PM
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start
INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 1048576
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = earliest
May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
Any suggestions as I have been on this now for a over a day with various
attempts but nothing comes through.
When connecting to a different topic (which I subscribe directly via Flink in
another application and get data from), I can set the .put("auto.offset.reset",
(Object) "earliest") to earliest and latest and see different values for the
offset so kafka appears to be available/ visible etc.
Many thanks
Conrad
SecureData, combating cyber threats
______________________________________________________________________
The information contained in this message or any of its attachments may be
privileged and confidential and intended for the exclusive use of the intended
recipient. If you are not the intended recipient any disclosure, reproduction,
distribution or other dissemination or use of this communications is strictly
prohibited. The views expressed in this email are those of the individual and
not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if
followed up by a formal written quote.
SecureData Europe Limited. Registered in England & Wales 04365896. Registered
Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent,
ME16 9NT