[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1795#issuecomment-203373358 No I opened a new one. It is this one https://github.com/apache/flink/pull/1839 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1795#issuecomment-203368412 @kl0u Has this been merged or are you going to open a new pr? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/1795 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1795#issuecomment-197244578 Thanks a lot @mxm ! If it is a problem, I can put the Watermark-related code in the FlinkConsumerBase class. It is just that this way it is clear which code is responsible for which functionality. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1795#issuecomment-197241772 It looks like the change is API-breaking in terms of binary compatibility because it changes the base class of the `FlinkKafkaConsumer08` and `FlinkKafkaConsumer09`. I wonder if that is a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1795#discussion_r56187319 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08WithPeriodicWM.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static java.util.Objects.requireNonNull; + +/** + * An implementation of a {@link FlinkKafkaComsumerWithWMBase} for Apache Kafka 0.8.x, that emits + * watermarks periodically. The user has to provide a {@link AssignerWithPeriodicWatermarks}. + * */ +public class FlinkKafkaConsumer08WithPeriodicWM extends FlinkKafkaConsumer08Base implements Triggerable { + + /** +* The user-specified methods to extract the timestamps from the records in Kafka, and +* to decide when to emit watermarks. +*/ + private final AssignerWithPeriodicWatermarks periodicWatermarkAssigner; + + /** +* The interval between periodic watermark emissions, as configured via the +* {@link ExecutionConfig#getAutoWatermarkInterval()}. +*/ + private long watermarkInterval = -1; + + private StreamingRuntimeContext runtime = null; + + private SourceContext srcContext = null; + + /** +* Creates a new Kafka streaming source consumer for Kafka 0.8.x +* +* @param topic +* The name of the topic that should be consumed. +* @param valueDeserializer +* The de-/serializer used to convert between Kafka's byte messages and Flink's objects. +* @param props +* The properties used to configure the Kafka consumer client, and the ZooKeeper client. +* @param timestampAssigner +* The user-specified methods to extract the timestamps and decide when to emit watermarks. +* This has to implement the {@link AssignerWithPeriodicWatermarks} interface. +*/ + public FlinkKafkaConsumer08WithPeriodicWM(String topic, + DeserializationSchema valueDeserializer, + Properties props, + AssignerWithPeriodicWatermarks timestampAssigner) { + this(Collections.singletonList(topic), valueDeserializer, props, timestampAssigner); + } + + /** +* Creates a new Kafka streaming source consumer for Kafka 0.8.x +* +* This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value +* pairs, offsets, and topic names from Kafka. +* +* @param topic +* The name of the topic that should be consumed. +* @param deserializer +* The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. +* @param props +* The properties
[GitHub] flink pull request: Kafka-3375: Allows Watermark Generation in the...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/1795 Kafka-3375: Allows Watermark Generation in the Kafka Source. In a nutshell this PR allows the generation of Watermarks in the Kafka source, based on a user-provided function. This is to face the situation where a Kafka source has multiple partitions per source task, and records become out of order before timestamps can be extracted and watermarks can be generated. This PR implements 2 new versions of the FlinkKafkaConsumer for each of the supported Kafka version (0.8 and 0.9), with each one taking an additional argument, which is: for the first a AssignerWithPeriodicWatermarks() timestamp extractor and for the second, a AssignerWithPunctuatedWatermarks(). You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink kafka_wm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1795.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1795 commit d723d5e210e7e7d4f92424acb7ea177c1836bdc5 Author: kl0uDate: 2016-03-08T16:35:14Z Adds the Kafka09 punctuated source. commit cbdbf3e4eafe59d7f8e8a5be2e7f408399e6e8d3 Author: kl0u Date: 2016-03-09T11:08:50Z Adds also the periodic watermark source. commit c1af0a02dfae407bd95fccc1f716ad6e6deff004 Author: kl0u Date: 2016-03-09T13:06:04Z Code cleaning of KafkaConsumer09. commit 5c4f3309cd1cef39015e0174e17d49fa6c6d8c0c Author: kl0u Date: 2016-03-09T16:34:15Z Code cleaning of KafkaConsumer09. commit 34eadbc2c3396cc0fda2b6961f350c5c3f567d79 Author: kl0u Date: 2016-03-09T20:52:13Z Testing. commit 454717ccc5e9e78f0f0fd69df5eb7d9821a7942d Author: kl0u Date: 2016-03-10T15:04:02Z Refactor code. commit e8eac33ac9931cb85a1fdbb2bfc6691e01235d0e Author: kl0u Date: 2016-03-14T13:36:17Z Added functionality for supporting empty partitions. commit 87e18cba8de4ed58e4676847c0635ad7ebdb4621 Author: kl0u Date: 2016-03-14T14:03:23Z Removed guava uses. commit 091a2e3ff9dc4168fb16b461a86b2086685db424 Author: kl0u Date: 2016-03-14T14:41:04Z Fixed a bug in the tests. commit 0f115405578722153e5bdba2a8c64f3bf5a52b49 Author: kl0u Date: 2016-03-14T15:35:30Z Added documentation. commit 5aa79ba01683370efc90fe4b0cccb7d852c50a2a Author: kl0u Date: 2016-03-15T14:13:32Z Fixed BUG in Periodic timestamp sources. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---