Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector
Thanks for the prompt reply On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzgerwrote: > Hi Mahesh, > > this is a known limitation of Apache Kafka: https://www.mail- > archive.com/us...@kafka.apache.org/msg22595.html > You could implement a tool that is manually retrieving the latest offset > for the group from the __offsets topic. > > On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR > wrote: > >> Hi Team, >> >> Kindly let me know if I am doing something wrong. >> >> Kafka Version - kafka_2.11-0.10.1.1 >> Flink Version - flink-1.2.0 >> Using the latest Kafka Connector - FlinkKafkaConsumer010 - >> flink-connector-kafka-0.10_2.11 >> >> Issue Faced: Not able to get the consumer offsets from Kafka when using >> Flink with Flink-Kafka Connector >> >> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh >> --bootstrap-server localhost:9092 --list >> console-consumer-19886 >> console-consumer-89637 >> $ >> >> It does not show the consumer group "test" >> >> For a group that does not exist, the message is as follows: >> >> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh >> --bootstrap-server localhost:9092 --group test1 --describe >> Consumer group `test1` does not exist. >> $ >> >> For the "test" group the error message is as follows >> >> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh >> --bootstrap-server localhost:9092 --group test --describe >> Error while executing consumer group command Group test with protocol >> type '' is not a valid consumer group >> java.lang.IllegalArgumentException: Group test with protocol type '' is >> not a valid consumer group >> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152) >> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d >> escribeGroup(ConsumerGroupCommand.scala:308) >> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class. >> describe(ConsumerGroupCommand.scala:89) >> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d >> escribe(ConsumerGroupCommand.scala:296) >> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) >> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) >> $ >> >> The error is from the AdminClient.scala (https://github.com/apache/kaf >> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala) >> >> if (metadata.state != "Dead" && metadata.state != "Empty" && >> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) >> throw new IllegalArgumentException(s"Consumer Group $groupId with >> protocol type '${metadata.protocolType}' is not a valid consumer group") >> >> Code: >> >> import java.util.Properties; >> import org.apache.flink.streaming.api.datastream.DataStreamSource; >> import org.apache.flink.streaming.api.environment.StreamExecutionEn >> vironment; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> >> public class KafkaFlinkOutput { >> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181"; >> private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; >> private static final String CONSUMER_GROUP = "test"; >> >> public KafkaFlinkOutput() { >> } >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >> ExecutionEnvironment(); >> Properties kafkaProps = new Properties(); >> kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); >> kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); >> kafkaProps.setProperty("group.id", "test"); >> kafkaProps.setProperty("auto.offset.reset", "latest"); >> env.enableCheckpointing(1000L); >> FlinkKafkaConsumer010 consumer = new >> FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps); >> DataStreamSource consumerData = env.addSource(consumer); >> consumerData.print(); >> System.out.println("Streaming Kafka in Flink"); >> env.execute("Starting now!"); >> } >> } >> >> Debug Logs that show that Kafka Connector does commit to Kafka: >> >> 2017-02-07 09:52:38,851 INFO >> org.apache.kafka.clients.consumer.ConsumerConfig >> - ConsumerConfig values: >> metric.reporters = [] >> metadata.max.age.ms = 30 >> partition.assignment.strategy = [org.apache.kafka.clients.cons >> umer.RangeAssignor] >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> max.partition.fetch.bytes = 1048576 >> bootstrap.servers = [localhost:9092] >> ssl.keystore.type = JKS >> enable.auto.commit = true >> sasl.mechanism = GSSAPI >> interceptor.classes = null >> exclude.internal.topics = true >> ssl.truststore.password = null >> client.id = >>
Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector
Hi Mahesh, this is a known limitation of Apache Kafka: https://www.mail-archive.com/users@kafka.apache.org/msg22595.html You could implement a tool that is manually retrieving the latest offset for the group from the __offsets topic. On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMARwrote: > Hi Team, > > Kindly let me know if I am doing something wrong. > > Kafka Version - kafka_2.11-0.10.1.1 > Flink Version - flink-1.2.0 > Using the latest Kafka Connector - FlinkKafkaConsumer010 - > flink-connector-kafka-0.10_2.11 > > Issue Faced: Not able to get the consumer offsets from Kafka when using > Flink with Flink-Kafka Connector > > $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh > --bootstrap-server localhost:9092 --list > console-consumer-19886 > console-consumer-89637 > $ > > It does not show the consumer group "test" > > For a group that does not exist, the message is as follows: > > $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh > --bootstrap-server localhost:9092 --group test1 --describe > Consumer group `test1` does not exist. > $ > > For the "test" group the error message is as follows > > $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh > --bootstrap-server localhost:9092 --group test --describe > Error while executing consumer group command Group test with protocol type > '' is not a valid consumer group > java.lang.IllegalArgumentException: Group test with protocol type '' is > not a valid consumer group > at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152) > at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d > escribeGroup(ConsumerGroupCommand.scala:308) > at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class. > describe(ConsumerGroupCommand.scala:89) > at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d > escribe(ConsumerGroupCommand.scala:296) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > $ > > The error is from the AdminClient.scala (https://github.com/apache/kaf > ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala) > > if (metadata.state != "Dead" && metadata.state != "Empty" && > metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) > throw new IllegalArgumentException(s"Consumer Group $groupId with > protocol type '${metadata.protocolType}' is not a valid consumer group") > > Code: > > import java.util.Properties; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEn > vironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > > public class KafkaFlinkOutput { > private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181"; > private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; > private static final String CONSUMER_GROUP = "test"; > > public KafkaFlinkOutput() { > } > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = StreamExecutionEnvironment.get > ExecutionEnvironment(); > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "test"); > kafkaProps.setProperty("auto.offset.reset", "latest"); > env.enableCheckpointing(1000L); > FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1", > new SimpleStringSchema(), kafkaProps); > DataStreamSource consumerData = env.addSource(consumer); > consumerData.print(); > System.out.println("Streaming Kafka in Flink"); > env.execute("Starting now!"); > } > } > > Debug Logs that show that Kafka Connector does commit to Kafka: > > 2017-02-07 09:52:38,851 INFO org.apache.kafka.clients.consumer.ConsumerConfig > - ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 30 > partition.assignment.strategy = [org.apache.kafka.clients.cons > umer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [localhost:9092] > ssl.keystore.type = JKS > enable.auto.commit = true > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 <02147%20483%20647> > check.crcs = true > request.timeout.ms = 4 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000
Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector
Hi Team, Kindly let me know if I am doing something wrong. Kafka Version - kafka_2.11-0.10.1.1 Flink Version - flink-1.2.0 Using the latest Kafka Connector - FlinkKafkaConsumer010 - flink-connector-kafka-0.10_2.11 Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list console-consumer-19886 console-consumer-89637 $ It does not show the consumer group "test" For a group that does not exist, the message is as follows: $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test1 --describe Consumer group `test1` does not exist. $ For the "test" group the error message is as follows $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test --describe Error while executing consumer group command Group test with protocol type '' is not a valid consumer group java.lang.IllegalArgumentException: Group test with protocol type '' is not a valid consumer group at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup( ConsumerGroupCommand.scala:308) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class. describe(ConsumerGroupCommand.scala:89) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService. describe(ConsumerGroupCommand.scala:296) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) $ The error is from the AdminClient.scala (https://github.com/apache/ kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala) if (metadata.state != "Dead" && metadata.state != "Empty" && metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) throw new IllegalArgumentException(s"Consumer Group $groupId with protocol type '${metadata.protocolType}' is not a valid consumer group") Code: import java.util.Properties; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment. StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; public class KafkaFlinkOutput { private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181"; private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; private static final String CONSUMER_GROUP = "test"; public KafkaFlinkOutput() { } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); kafkaProps.setProperty("group.id", "test"); kafkaProps.setProperty("auto.offset.reset", "latest"); env.enableCheckpointing(1000L); FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps); DataStreamSource consumerData = env.addSource(consumer); consumerData.print(); System.out.println("Streaming Kafka in Flink"); env.execute("Starting now!"); } } Debug Logs that show that Kafka Connector does commit to Kafka: 2017-02-07 09:52:38,851 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 30 partition.assignment.strategy = [org.apache.kafka.clients. consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [localhost:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 <02147%20483%20647> check.crcs = true request.timeout.ms = 4 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization. ByteArrayDeserializer group.id = test retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05