Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread MAHESH KUMAR
Thanks for the prompt reply

On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzger  wrote:

> Hi Mahesh,
>
> this is a known limitation of Apache Kafka: https://www.mail-
> archive.com/us...@kafka.apache.org/msg22595.html
> You could implement a tool that is manually retrieving the latest offset
> for the group from the __offsets topic.
>
> On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR  > wrote:
>
>> Hi Team,
>>
>> Kindly let me know if I am doing something wrong.
>>
>> Kafka Version - kafka_2.11-0.10.1.1
>> Flink Version - flink-1.2.0
>> Using the latest Kafka Connector - FlinkKafkaConsumer010 -
>> flink-connector-kafka-0.10_2.11
>>
>> Issue Faced: Not able to get the consumer offsets from Kafka when using
>> Flink with Flink-Kafka Connector
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092  --list
>> console-consumer-19886
>> console-consumer-89637
>> $
>>
>> It does not show the consumer group "test"
>>
>> For a group that does not exist, the message is as follows:
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092 --group test1 --describe
>> Consumer group `test1` does not exist.
>> $
>>
>> For the "test" group the error message is as follows
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092 --group test --describe
>> Error while executing consumer group command Group test with protocol
>> type '' is not a valid consumer group
>> java.lang.IllegalArgumentException: Group test with protocol type '' is
>> not a valid consumer group
>> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
>> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
>> escribeGroup(ConsumerGroupCommand.scala:308)
>> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
>> describe(ConsumerGroupCommand.scala:89)
>> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
>> escribe(ConsumerGroupCommand.scala:296)
>> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
>> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
>> $
>>
>> The error is from the AdminClient.scala (https://github.com/apache/kaf
>> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)
>>
>> if (metadata.state != "Dead" && metadata.state != "Empty" &&
>> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
>>   throw new IllegalArgumentException(s"Consumer Group $groupId with
>> protocol type '${metadata.protocolType}' is not a valid consumer group")
>>
>> Code:
>>
>> import java.util.Properties;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>>
>> public class KafkaFlinkOutput {
>> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
>> private static final String CONSUMER_GROUP = "test";
>>
>> public KafkaFlinkOutput() {
>> }
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>> Properties kafkaProps = new Properties();
>> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>> kafkaProps.setProperty("group.id", "test");
>> kafkaProps.setProperty("auto.offset.reset", "latest");
>> env.enableCheckpointing(1000L);
>> FlinkKafkaConsumer010 consumer = new
>> FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps);
>> DataStreamSource consumerData = env.addSource(consumer);
>> consumerData.print();
>> System.out.println("Streaming Kafka in Flink");
>> env.execute("Starting now!");
>> }
>> }
>>
>> Debug Logs that show that Kafka Connector does commit to Kafka:
>>
>> 2017-02-07 09:52:38,851 INFO  
>> org.apache.kafka.clients.consumer.ConsumerConfig
>>  - ConsumerConfig values:
>> metric.reporters = []
>> metadata.max.age.ms = 30
>> partition.assignment.strategy = [org.apache.kafka.clients.cons
>> umer.RangeAssignor]
>> reconnect.backoff.ms = 50
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> max.partition.fetch.bytes = 1048576
>> bootstrap.servers = [localhost:9092]
>> ssl.keystore.type = JKS
>> enable.auto.commit = true
>> sasl.mechanism = GSSAPI
>> interceptor.classes = null
>> exclude.internal.topics = true
>> ssl.truststore.password = null
>> client.id =
>> 

Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread Robert Metzger
Hi Mahesh,

this is a known limitation of Apache Kafka:
https://www.mail-archive.com/users@kafka.apache.org/msg22595.html
You could implement a tool that is manually retrieving the latest offset
for the group from the __offsets topic.

On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR 
wrote:

> Hi Team,
>
> Kindly let me know if I am doing something wrong.
>
> Kafka Version - kafka_2.11-0.10.1.1
> Flink Version - flink-1.2.0
> Using the latest Kafka Connector - FlinkKafkaConsumer010 -
> flink-connector-kafka-0.10_2.11
>
> Issue Faced: Not able to get the consumer offsets from Kafka when using
> Flink with Flink-Kafka Connector
>
> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092  --list
> console-consumer-19886
> console-consumer-89637
> $
>
> It does not show the consumer group "test"
>
> For a group that does not exist, the message is as follows:
>
> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092 --group test1 --describe
> Consumer group `test1` does not exist.
> $
>
> For the "test" group the error message is as follows
>
> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
> --bootstrap-server localhost:9092 --group test --describe
> Error while executing consumer group command Group test with protocol type
> '' is not a valid consumer group
> java.lang.IllegalArgumentException: Group test with protocol type '' is
> not a valid consumer group
> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
> escribeGroup(ConsumerGroupCommand.scala:308)
> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
> describe(ConsumerGroupCommand.scala:89)
> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
> escribe(ConsumerGroupCommand.scala:296)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> $
>
> The error is from the AdminClient.scala (https://github.com/apache/kaf
> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)
>
> if (metadata.state != "Dead" && metadata.state != "Empty" &&
> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
>   throw new IllegalArgumentException(s"Consumer Group $groupId with
> protocol type '${metadata.protocolType}' is not a valid consumer group")
>
> Code:
>
> import java.util.Properties;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEn
> vironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>
> public class KafkaFlinkOutput {
> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
> private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
> private static final String CONSUMER_GROUP = "test";
>
> public KafkaFlinkOutput() {
> }
>
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
> ExecutionEnvironment();
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> kafkaProps.setProperty("group.id", "test");
> kafkaProps.setProperty("auto.offset.reset", "latest");
> env.enableCheckpointing(1000L);
> FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1",
> new SimpleStringSchema(), kafkaProps);
> DataStreamSource consumerData = env.addSource(consumer);
> consumerData.print();
> System.out.println("Streaming Kafka in Flink");
> env.execute("Starting now!");
> }
> }
>
> Debug Logs that show that Kafka Connector does commit to Kafka:
>
> 2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig
>  - ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy = [org.apache.kafka.clients.cons
> umer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [localhost:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = true
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id =
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647 <02147%20483%20647>
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread MAHESH KUMAR
Hi Team,

Kindly let me know if I am doing something wrong.

Kafka Version - kafka_2.11-0.10.1.1
Flink Version - flink-1.2.0
Using the latest Kafka Connector - FlinkKafkaConsumer010 -
flink-connector-kafka-0.10_2.11

Issue Faced: Not able to get the consumer offsets from Kafka when using
Flink with Flink-Kafka Connector

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092  --list
console-consumer-19886
console-consumer-89637
$

It does not show the consumer group "test"

For a group that does not exist, the message is as follows:

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group test1 --describe
Consumer group `test1` does not exist.
$

For the "test" group the error message is as follows

$ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group test --describe
Error while executing consumer group command Group test with protocol type
'' is not a valid consumer group
java.lang.IllegalArgumentException: Group test with protocol type '' is not
a valid consumer group
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(
ConsumerGroupCommand.scala:308)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
describe(ConsumerGroupCommand.scala:89)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.
describe(ConsumerGroupCommand.scala:296)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
$

The error is from the AdminClient.scala (https://github.com/apache/
kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)

if (metadata.state != "Dead" && metadata.state != "Empty" &&
metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
  throw new IllegalArgumentException(s"Consumer Group $groupId with
protocol type '${metadata.protocolType}' is not a valid consumer group")

Code:

import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaFlinkOutput {
private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
private static final String CONSUMER_GROUP = "test";

public KafkaFlinkOutput() {
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "test");
kafkaProps.setProperty("auto.offset.reset", "latest");
env.enableCheckpointing(1000L);
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1",
new SimpleStringSchema(), kafkaProps);
DataStreamSource consumerData = env.addSource(consumer);
consumerData.print();
System.out.println("Streaming Kafka in Flink");
env.execute("Starting now!");
}
}

Debug Logs that show that Kafka Connector does commit to Kafka:

2017-02-07 09:52:38,851 INFO  org.apache.kafka.clients.consumer.ConsumerConfig
 - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy = [org.apache.kafka.clients.
consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647 <02147%20483%20647>
check.crcs = true
request.timeout.ms = 4
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.
ByteArrayDeserializer
group.id = test
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05