[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-30 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1795#issuecomment-203373358
  
No I opened a new one. It is this one 
https://github.com/apache/flink/pull/1839


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-30 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1795#issuecomment-203368412
  
@kl0u Has this been merged or are you going to open a new pr?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-30 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/1795


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-16 Thread kl0u
Github user kl0u commented on the pull request:

https://github.com/apache/flink/pull/1795#issuecomment-197244578
  
Thanks a lot @mxm !
If it is a problem, I can put the Watermark-related code in the 
FlinkConsumerBase class.
It is just that this way it is clear which code is responsible for which 
functionality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-16 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1795#issuecomment-197241772
  
It looks like the change is API-breaking in terms of binary compatibility 
because it changes the base class of the `FlinkKafkaConsumer08` and 
`FlinkKafkaConsumer09`. I wonder if that is a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1795#discussion_r56187319
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08WithPeriodicWM.java
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An implementation of a {@link FlinkKafkaComsumerWithWMBase} for Apache 
Kafka 0.8.x, that emits
+ * watermarks periodically. The user has to provide a {@link 
AssignerWithPeriodicWatermarks}.
+ * */
+public class FlinkKafkaConsumer08WithPeriodicWM extends 
FlinkKafkaConsumer08Base implements Triggerable {
+
+   /**
+* The user-specified methods to extract the timestamps from the 
records in Kafka, and
+* to decide when to emit watermarks.
+*/
+   private final AssignerWithPeriodicWatermarks 
periodicWatermarkAssigner;
+
+   /**
+* The interval between periodic watermark emissions, as configured via 
the
+* {@link ExecutionConfig#getAutoWatermarkInterval()}.
+*/
+   private long watermarkInterval = -1;
+
+   private StreamingRuntimeContext runtime = null;
+
+   private SourceContext srcContext = null;
+
+   /**
+* Creates a new Kafka streaming source consumer for Kafka 0.8.x
+*
+* @param topic
+*   The name of the topic that should be consumed.
+* @param valueDeserializer
+*   The de-/serializer used to convert between Kafka's byte 
messages and Flink's objects.
+* @param props
+*   The properties used to configure the Kafka consumer 
client, and the ZooKeeper client.
+* @param timestampAssigner
+*   The user-specified methods to extract the timestamps and 
decide when to emit watermarks.
+*   This has to implement the {@link 
AssignerWithPeriodicWatermarks} interface.
+*/
+   public FlinkKafkaConsumer08WithPeriodicWM(String topic,
+   
DeserializationSchema valueDeserializer,
+   
Properties props,
+   
AssignerWithPeriodicWatermarks timestampAssigner) {
+   this(Collections.singletonList(topic), valueDeserializer, 
props, timestampAssigner);
+   }
+
+   /**
+* Creates a new Kafka streaming source consumer for Kafka 0.8.x
+*
+* This constructor allows passing a {@see KeyedDeserializationSchema} 
for reading key/value
+* pairs, offsets, and topic names from Kafka.
+*
+* @param topic
+*   The name of the topic that should be consumed.
+* @param deserializer
+*   The keyed de-/serializer used to convert between Kafka's 
byte messages and Flink's objects.
+* @param props
+*   The properties 

[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...

2016-03-15 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/1795

Kafka-3375: Allows Watermark Generation in the Kafka Source.

In a nutshell this PR allows the generation of Watermarks in the Kafka 
source, based on a user-provided function. This is to face the situation where 
a Kafka source has multiple partitions per source task, and records become out 
of order before timestamps can be extracted and watermarks can be generated. 

This PR implements 2 new versions of the FlinkKafkaConsumer for each of the 
supported Kafka version (0.8 and 0.9), with each one taking an additional 
argument, which is: for the first a AssignerWithPeriodicWatermarks() timestamp 
extractor and for the second, a AssignerWithPunctuatedWatermarks().

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink kafka_wm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1795.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1795


commit d723d5e210e7e7d4f92424acb7ea177c1836bdc5
Author: kl0u 
Date:   2016-03-08T16:35:14Z

Adds the Kafka09 punctuated source.

commit cbdbf3e4eafe59d7f8e8a5be2e7f408399e6e8d3
Author: kl0u 
Date:   2016-03-09T11:08:50Z

Adds also the periodic watermark source.

commit c1af0a02dfae407bd95fccc1f716ad6e6deff004
Author: kl0u 
Date:   2016-03-09T13:06:04Z

Code cleaning of KafkaConsumer09.

commit 5c4f3309cd1cef39015e0174e17d49fa6c6d8c0c
Author: kl0u 
Date:   2016-03-09T16:34:15Z

Code cleaning of KafkaConsumer09.

commit 34eadbc2c3396cc0fda2b6961f350c5c3f567d79
Author: kl0u 
Date:   2016-03-09T20:52:13Z

Testing.

commit 454717ccc5e9e78f0f0fd69df5eb7d9821a7942d
Author: kl0u 
Date:   2016-03-10T15:04:02Z

Refactor code.

commit e8eac33ac9931cb85a1fdbb2bfc6691e01235d0e
Author: kl0u 
Date:   2016-03-14T13:36:17Z

Added functionality for supporting empty partitions.

commit 87e18cba8de4ed58e4676847c0635ad7ebdb4621
Author: kl0u 
Date:   2016-03-14T14:03:23Z

Removed guava uses.

commit 091a2e3ff9dc4168fb16b461a86b2086685db424
Author: kl0u 
Date:   2016-03-14T14:41:04Z

Fixed a bug in the tests.

commit 0f115405578722153e5bdba2a8c64f3bf5a52b49
Author: kl0u 
Date:   2016-03-14T15:35:30Z

Added documentation.

commit 5aa79ba01683370efc90fe4b0cccb7d852c50a2a
Author: kl0u 
Date:   2016-03-15T14:13:32Z

Fixed BUG in Periodic timestamp sources.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---