Flink 1.3 - Checkpointing failing

2017-06-02 Thread MAHESH KUMAR
Hi Team,

We have some test cases written using StreamingMultipleProgramsTestBase
It was working fine in version 1.2, we get the following error in version
1.3
It seems like CheckpointCoordinator fails after this error and
Checkpointing no longer occurs.

I came across this bug: https://issues.apache.org/jira/browse/FLINK-5462 ,
It looks kind of similar but I am not exactly sure.

2017-06-02 16:11:07,048  INFO | flink-akka.actor.default-dispatcher-3 |
org.apache.flink.runtime.executiongraph.ExecutionGraph  | Could not restart
the job pipeline_message_auditor (f54182ae17352efb9aa40667c283ce09) because
the restart strategy prevented it.
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
java.lang.Exception: Could not materialize checkpoint 1 for operator
TriggerWindow(TumblingProcessingTimeWindows(4000),
ReducingStateDescriptor{serializer=com.oracle.ci.flink.streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$anon$2@e42b922d,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@59623f44},
ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301)) ->
(Map -> Sink: auditor_out-kafkaSink, Map -> Sink: auditor_expire-kafkaSink)
(1/1).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:963)
~[flink-streaming-java_2.11-1.3.0.jar:1.3.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[na:1.8.0_112]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_112]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
~[na:1.8.0_112]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
~[na:1.8.0_112]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_112]
Caused by: java.lang.Exception: Could not materialize checkpoint 1 for
operator TriggerWindow(TumblingProcessingTimeWindows(4000),
ReducingStateDescriptor{serializer=com.oracle.ci.flink.streaming.MessageAuditorStreamingHelper$$anonfun$1$$anon$7$$anon$2@e42b922d,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@59623f44},
ProcessingTimeTrigger(), WindowedStream.reduce(WindowedStream.java:301)) ->
(Map -> Sink: auditor_out-kafkaSink, Map -> Sink: auditor_expire-kafkaSink)
(1/1).
... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException:
java.lang.UnsupportedOperationException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
~[na:1.8.0_112]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[na:1.8.0_112]
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
~[flink-core-1.3.0.jar:1.3.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
~[flink-streaming-java_2.11-1.3.0.jar:1.3.0]
... 5 common frames omitted
Suppressed: java.lang.Exception: Could not properly cancel managed keyed
state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
~[flink-streaming-java_2.11-1.3.0.jar:1.3.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
~[flink-streaming-java_2.11-1.3.0.jar:1.3.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
~[flink-streaming-java_2.11-1.3.0.jar:1.3.0]
... 5 common frames omitted
Caused by: java.util.concurrent.ExecutionException:
java.lang.UnsupportedOperationException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 7 common frames omitted
Caused by: java.lang.UnsupportedOperationException: null
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155)
at
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.(CompositeTypeSerializerConfigSnapshot.java:53)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.(TupleSerializerConfigSnapshot.java:45)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:132)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:39)
at
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591)
at

Flink - Iteration and Backpressure

2017-05-26 Thread MAHESH KUMAR
Hi Team,

I am trying to build an audit like system where I read messages from "n"
Kafka queues, key by a unique key and then reduce them to a single message,
if it has passed through all the "n" Kafka queues in a window time of "m"
hours/days, the message has succeeded else it has expired.

I can get it working in my test case but can't get it working when there
are million of messages, there are very few messages that goes to the
success stage in the iteration, huge amount of messages are sent back to
the iteration, hence it create back pressure and it does not read the
messages from Kafka queues anymore. Since no new messages are read, the
messages inside the window no longer succeed, they keep going through the
iterator forever and expire although they must succeed.

I read about the buffer which when full creates back pressure and does not
read any more messages. The system is suppose to be a light weight audit
system and audit messages created are very small in size. Is it possible to
increase the size of the buffer to avoid back pressure? Is there an
alternative solution to this issue?

The code looks like this:

val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)

def audit() = {
 reducedStream =
unionInputStream.keyby(keyFunction).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
splitStreams = reducedStream.split(splitFunction)
splitStreams.select(success).addSink(terminalSink)
splitStreams.select(expire).addSink(expireSink)
(splitStreams.select(replay), splitStreams.select(success))
}

unionInputStream.iterate(audit(_))



Thanks and Regards,
Mahesh


Flink - Writing Test Case for the Datastream

2017-03-09 Thread MAHESH KUMAR
Hi Team,

I am trying to write test cases to check whether the job is getting
executed as desired. I am using the Flink test util. I am trying to do a
end to end testing where Flink reads from a Kafka Queue, does some
processing and then writes the output to another topic of the Kafka Queue.
My objective is to read the message from the output topic and check if it
has the same message as expected.

I have got Zookeeper and Kafka configured for the test. When I start the
Flink Job, it never terminates since it's source is a Kafka Source. Is
there a way to run a job for a specific interval of time or how do I go
about testing this scenario. Is there any documentation/example for running
test cases such as these?

My code currently looks something like this:

class StreamingMultipleTest extends StreamingMultipleProgramsTestBase
{

@Before def initialize() = {
// Start Kafka, Zookeeper
// Call the run method of the Flink Class - FlinkClass.run()  // This class
contains the env.execute()

// My code does not execute any further since the previous call is never
returned.
}

@Test def Test1() = {
// Check if the Output Topic of the Kafka Queue is as expected -
AssertStatement

}

@After def closeServices() = {
// Stop Zookeeper and Kafka
}

}


Thanks and Regards,
Mahesh

-- 

Mahesh Kumar Ravindranathan
Data Streaming Engineer
Oracle Marketing Cloud - Social Platform
Contact No:+1(720)492-4445


Re: Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread MAHESH KUMAR
Thanks for the prompt reply

On Tue, Feb 7, 2017 at 10:38 AM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Mahesh,
>
> this is a known limitation of Apache Kafka: https://www.mail-
> archive.com/us...@kafka.apache.org/msg22595.html
> You could implement a tool that is manually retrieving the latest offset
> for the group from the __offsets topic.
>
> On Tue, Feb 7, 2017 at 6:24 PM, MAHESH KUMAR <r.mahesh.kumar@gmail.com
> > wrote:
>
>> Hi Team,
>>
>> Kindly let me know if I am doing something wrong.
>>
>> Kafka Version - kafka_2.11-0.10.1.1
>> Flink Version - flink-1.2.0
>> Using the latest Kafka Connector - FlinkKafkaConsumer010 -
>> flink-connector-kafka-0.10_2.11
>>
>> Issue Faced: Not able to get the consumer offsets from Kafka when using
>> Flink with Flink-Kafka Connector
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092  --list
>> console-consumer-19886
>> console-consumer-89637
>> $
>>
>> It does not show the consumer group "test"
>>
>> For a group that does not exist, the message is as follows:
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092 --group test1 --describe
>> Consumer group `test1` does not exist.
>> $
>>
>> For the "test" group the error message is as follows
>>
>> $ /work/kafka_2.11-0.10.1.1/bin/kafka-consumer-groups.sh
>> --bootstrap-server localhost:9092 --group test --describe
>> Error while executing consumer group command Group test with protocol
>> type '' is not a valid consumer group
>> java.lang.IllegalArgumentException: Group test with protocol type '' is
>> not a valid consumer group
>> at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
>> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
>> escribeGroup(ConsumerGroupCommand.scala:308)
>> at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.
>> describe(ConsumerGroupCommand.scala:89)
>> at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.d
>> escribe(ConsumerGroupCommand.scala:296)
>> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
>> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
>> $
>>
>> The error is from the AdminClient.scala (https://github.com/apache/kaf
>> ka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala)
>>
>> if (metadata.state != "Dead" && metadata.state != "Empty" &&
>> metadata.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
>>   throw new IllegalArgumentException(s"Consumer Group $groupId with
>> protocol type '${metadata.protocolType}' is not a valid consumer group")
>>
>> Code:
>>
>> import java.util.Properties;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>>
>> public class KafkaFlinkOutput {
>> private static final String LOCAL_ZOOKEEPER_HOST = "localhost:2181";
>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092";
>> private static final String CONSUMER_GROUP = "test";
>>
>> public KafkaFlinkOutput() {
>> }
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>> Properties kafkaProps = new Properties();
>> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
>> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
>> kafkaProps.setProperty("group.id", "test");
>> kafkaProps.setProperty("auto.offset.reset", "latest");
>> env.enableCheckpointing(1000L);
>> FlinkKafkaConsumer010 consumer = new
>> FlinkKafkaConsumer010("testIn1", new SimpleStringSchema(), kafkaProps);
>> DataStreamSource consumerData = env.addSource(consumer);
>> consumerData.print();
>> System.out.println("Streaming Kafka in Flink");
>> env.execute("Starting now!");
>> }
>> }
>>
>> Debug Logs that

Issue Faced: Not able to get the consumer offsets from Kafka when using Flink with Flink-Kafka Connector

2017-02-07 Thread MAHESH KUMAR
rtition testIn1-7
2017-02-07 09:53:38,738 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
 - Group test committed offset 52292 for partition testIn1-3
2017-02-07 09:53:38,738 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
 - Group test committed offset 17438 for partition testIn1-6
2017-02-07 09:53:38,738 DEBUG
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
 - Group test committed offset 0 for partition testIn1-2
2017-02-07 09:53:43,524 DEBUG org.apache.flink.runtime.taskmanager.TaskManager
 - Sending heartbeat to JobManager
2017-02-07 09:53:43,730 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 13 @ 1486486423730

-- 

Mahesh Kumar Ravindranathan
Data Streaming Engineer
Oracle Marketing Cloud - Social Platform
Contact No:+1(720)492-4445