Flink 1.3 - Checkpointing failing
Hi Team, We have some test cases written using StreamingMultipleProgramsTestBase It was working fine in version 1.2, we get the following error in version 1.3 It seems like CheckpointCoordinator fails after this error and Checkpointing no longer occurs. I came across this bug: https://issues.apache.org/jira/browse/FLINK-5462 , It looks kind of similar but I am not exactly sure. 2017-06-02 16:11:07,048 INFO | flink-akka.actor.default-dispatcher-3 | org.apache.flink.runtime.executiongraph.ExecutionGraph | Could not restart the job pipeline_message_auditor (f54182ae17352efb9aa40667c283ce09) because the restart strategy prevented it. org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(4000), ReducingStateDescriptor{serializer=com.oracle.ci.flink.streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$anon$2@e42b922d, reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@59623f44}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301)) -> (Map -> Sink: auditor_out-kafkaSink, Map -> Sink: auditor_expire-kafkaSink) (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:963) ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_112] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_112] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_112] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_112] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_112] Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(4000), ReducingStateDescriptor{serializer=com.oracle.ci.flink.streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$anon$2@e42b922d, reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@59623f44}, ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301)) -> (Map -> Sink: auditor_out-kafkaSink, Map -> Sink: auditor_expire-kafkaSink) (1/1). ... 6 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:1.8.0_112] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[na:1.8.0_112] at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) ~[flink-core-1.3.0.jar:1.3.0] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893) ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0] ... 5 common frames omitted Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018) ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957) ~[flink-streaming-java_2.11-1.3.0.jar:1.3.0] ... 5 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 common frames omitted Caused by: java.lang.UnsupportedOperationException: null at org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.(TupleSerializerConfigSnapshot.java:45) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:132) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:39) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591) at
Flink - Iteration and Backpressure
Hi Team, I am trying to build an audit like system where I read messages from "n" Kafka queues, key by a unique key and then reduce them to a single message, if it has passed through all the "n" Kafka queues in a window time of "m" hours/days, the message has succeeded else it has expired. I can get it working in my test case but can't get it working when there are million of messages, there are very few messages that goes to the success stage in the iteration, huge amount of messages are sent back to the iteration, hence it create back pressure and it does not read the messages from Kafka queues anymore. Since no new messages are read, the messages inside the window no longer succeed, they keep going through the iterator forever and expire although they must succeed. I read about the buffer which when full creates back pressure and does not read any more messages. The system is suppose to be a light weight audit system and audit messages created are very small in size. Is it possible to increase the size of the buffer to avoid back pressure? Is there an alternative solution to this issue? The code looks like this: val unionInputStream = union(kafka1,kafka2,kafka3,kafka4) def audit() = { reducedStream = unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction) splitStreams = reducedStream.split(splitFunction) splitStreams.select(success).addSink(terminalSink) splitStreams.select(expire).addSink(expireSink) (splitStreams.select(replay), splitStreams.select(success)) } unionInputStream.iterate(audit(_)) Thanks and Regards, Mahesh
Flink - Writing Test Case for the Datastream
Hi Team, I am trying to write test cases to check whether the job is getting executed as desired. I am using the Flink test util. I am trying to do a end to end testing where Flink reads from a Kafka Queue, does some processing and then writes the output to another topic of the Kafka Queue. My objective is to read the message from the output topic and check if it has the same message as expected. I have got Zookeeper and Kafka configured for the test. When I start the Flink Job, it never terminates since it's source is a Kafka Source. Is there a way to run a job for a specific interval of time or how do I go about testing this scenario. Is there any documentation/example for running test cases such as these? My code currently looks something like this: class StreamingMultipleTest extends StreamingMultipleProgramsTestBase { @Before def initialize() = { // Start Kafka, Zookeeper // Call the run method of the Flink Class - FlinkClass.run() // This class contains the env.execute() // My code does not execute any further since the previous call is never returned. } @Test def Test1() = { // Check if the Output Topic of the Kafka Queue is as expected - AssertStatement } @After def closeServices() = { // Stop Zookeeper and Kafka } } Thanks and Regards, Mahesh -- Mahesh Kumar Ravindranathan Data Streaming Engineer Oracle Marketing Cloud - Social Platform Contact No:+1(720)492-4445
Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector
Thanks for the prompt reply On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Mahesh, > > this is a known limitation of Apache Kafka: https://www.mail- > archive.com/us...@kafka.apache.org/msg22595.html > You could implement a tool that is manually retrieving the latest offset > for the group from the __offsets topic. > > On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <r.mahesh.kumar@gmail.com > > wrote: > >> Hi Team, >> >> Kindly let me know if I am doing something wrong. >> >> Kafka Version - kafka_2.11-0.10.1.1 >> Flink Version - flink-1.2.0 >> Using the latest Kafka Connector - FlinkKafkaConsumer010 - >> flink-connector-kafka-0.10_2.11 >> >> Issue Faced: Not able to get the consumer offsets from Kafka when using >> Flink with Flink-Kafka Connector >> >> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh >> --bootstrap-server localhost:9092 --list >> console-consumer-19886 >> console-consumer-89637 >> $ >> >> It does not show the consumer group "test" >> >> For a group that does not exist, the message is as follows: >> >> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh >> --bootstrap-server localhost:9092 --group test1 --describe >> Consumer group `test1` does not exist. >> $ >> >> For the "test" group the error message is as follows >> >> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh >> --bootstrap-server localhost:9092 --group test --describe >> Error while executing consumer group command Group test with protocol >> type '' is not a valid consumer group >> java.lang.IllegalArgumentException: Group test with protocol type '' is >> not a valid consumer group >> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152) >> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d >> escribeGroup(ConsumerGroupCommand.scala:308) >> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class. >> describe(ConsumerGroupCommand.scala:89) >> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d >> escribe(ConsumerGroupCommand.scala:296) >> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68) >> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) >> $ >> >> The error is from the AdminClient.scala (https://github.com/apache/kaf >> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala) >> >> if (metadata.state != "Dead" && metadata.state != "Empty" && >> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE) >> throw new IllegalArgumentException(s"Consumer Group $groupId with >> protocol type '${metadata.protocolType}' is not a valid consumer group") >> >> Code: >> >> import java.util.Properties; >> import org.apache.flink.streaming.api.datastream.DataStreamSource; >> import org.apache.flink.streaming.api.environment.StreamExecutionEn >> vironment; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> >> public class KafkaFlinkOutput { >> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181"; >> private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; >> private static final String CONSUMER_GROUP = "test"; >> >> public KafkaFlinkOutput() { >> } >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >> ExecutionEnvironment(); >> Properties kafkaProps = new Properties(); >> kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); >> kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); >> kafkaProps.setProperty("group.id", "test"); >> kafkaProps.setProperty("auto.offset.reset", "latest"); >> env.enableCheckpointing(1000L); >> FlinkKafkaConsumer010 consumer = new >> FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps); >> DataStreamSource consumerData = env.addSource(consumer); >> consumerData.print(); >> System.out.println("Streaming Kafka in Flink"); >> env.execute("Starting now!"); >> } >> } >> >> Debug Logs that
Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector
rtition testIn1-7 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group test committed offset 52292 for partition testIn1-3 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group test committed offset 17438 for partition testIn1-6 2017-02-07 09:53:38,738 DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group test committed offset 0 for partition testIn1-2 2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager 2017-02-07 09:53:43,730 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 13 @ 1486486423730 -- Mahesh Kumar Ravindranathan Data Streaming Engineer Oracle Marketing Cloud - Social Platform Contact No:+1(720)492-4445