[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656137#comment-16656137 ] vinoyang commented on FLINK-8500: - Hi, if the modern kafka connector also has this issue, Shall we remove *Kafka010Fetcher* from the title of the issue? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602584#comment-16602584 ] ASF GitHub Bot commented on FLINK-8500: --- alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#issuecomment-418235101 FYI, I have added deserialize with “Record” parameter to my PR This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597466#comment-16597466 ] ASF GitHub Bot commented on FLINK-8500: --- alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#issuecomment-417324038 @aljoscha, yes, I can do it. Should we still try to preserve compatibility with existing interface using _default_ method or just break it for good, what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597240#comment-16597240 ] ASF GitHub Bot commented on FLINK-8500: --- aljoscha commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#issuecomment-417254164 @alexeyt820 Yes, we considered that also but were reluctant because it's a bigger change. But now I think that we probably have to do it. Do you want to try that out in your PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596715#comment-16596715 ] ASF GitHub Bot commented on FLINK-8500: --- alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#issuecomment-417063770 @aljoscha, would be even better to have parameter object sort of wrapping Kafka ConsumerRecord instead of flat list of arguments (messageKey, message, topic, partition, offset, timestamp, headers,...) and pass it to deserialize? It feels more extensible approach - any futher changes in parameter object seems transparent for clients This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590215#comment-16590215 ] ASF GitHub Bot commented on FLINK-8500: --- FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212309444 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I've pushed the changes as we discussed above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590017#comment-16590017 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212252904 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I would be ok with proceeding with the above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589904#comment-16589904 ] ASF GitHub Bot commented on FLINK-8500: --- aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r21385 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: Yes, to unblock this I thing we can go with this approach, basically the schema becomes this: ``` @PublicEvolving public interface KeyedDeserializationSchema extends Serializable, ResultTypeQueryable { @Deprecated default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) { throw new RuntimeException("blammo"); } default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp) { return deserialize(/* call the other method */); } boolean isEndOfStream(T nextElement); } ``` With this, if you have an existing implementation of `KeyedDeserializationSchema` it will continue to work without any changes. If you implement a new one you have to implement one of the methods, otherwise the exception is thrown. And all Flink code only calls the version that takes the timestamp. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589289#comment-16589289 ] ASF GitHub Bot commented on FLINK-8500: --- FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212082669 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: @tzulitai We can remove the non-timestamped version of the `deserialize` method completely. It will break the interface and if you implemented this method it quite easy to migrate to the new method. I've only introduced the new `deserialize` method as default to make this change backwards compatible. When backwards compatibility is required we could mark the non-timestamped version of the `deserialize` method as `@Deprecated` and give it a default implementation that throws a `NotImplementedException` or a `FlinkRuntimeException` with the message "you must implement the deserialize method when implementingt the KeyedDeserializationSchema interface" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587192#comment-16587192 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r211532678 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I'm still not very convinced that this is a good approach. For example, things might be a lot cleaner and error-proof if we can just deprecate the non-timestamped `KeyedSerializationSchema` in favor of a new interface. Technically, we're trying to achieve the same thing, by things would perhaps be much understandable. @FredTing what do you think? Perhaps @aljoscha would also want to chime in some thoughts here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585692#comment-16585692 ] ASF GitHub Bot commented on FLINK-8500: --- FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r211197298 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: @tzulitai Yes, this would also be an issue with our long-term solution, and we should look at that carefully. For instance, when messages are encrypted on Kafka we need them to be decrypted. How stuff is encrypted is determined by the producer (out of our control), but we should be able to decrypt these message. When for encryption some kafka-meta information is used we need this during decryption too. > I'm not sure how this would work. Could you elaborate a bit more on this? When a custom implementation of the `KeyedDeserializationSchema` is created, then at least one of the `deserialize` methods must be implement (why else create a custom implementation?). When both `deserialize` methods have a default implementation the compiler isn't complaining anymore if you forget to implement it. When it is an empty method, you won't get complains during runtime either, but you won't receive any record too. By throwing an exception with a clear message that you forgot to override either one of the `deserialize` methods, you get a error message during runtime. When either one of the `deserialize` method is overridden the default with the exception isn't called any more. Since it's now an abstract method, al current implementations already override this `deserialize` method This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585659#comment-16585659 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r211187832 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: @FredTing Your first concern makes sense. Though, wouldn't this be a shortcoming also for our long-term solution, where we want to have separate interfaces for "deserialization" and "enrichment"? ``` I have no problem with the first alternative, but I think we are better of when we throw an exception with a message explaining that must implement/override one of the deserialize methods. ``` I'm not sure how this would work. Could you elaborate a bit more on this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583865#comment-16583865 ] ASF GitHub Bot commented on FLINK-8500: --- FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210898792 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I have a small problem with the second alternative. When I implement this interface in Scala, I do not want to use mutable objectS. I think the `setTimestamp` method is forcing to make the `T deserializedRecord` mutable (for at least the timestamp field) I have no problem with the first alternative, but I think we are better of when we throw an exception with a message explaining that must implement/override one of the `deserialize` methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583407#comment-16583407 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210810967 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: This would mean that, if users want to access timestamps, they would have to implement both deserialize methods, correct? This feels a bit awkward. I can think of 2 possible approaches to address this: 1. Let the original non-timestamped deserialize method also be default, but have an empty method. It's still not nice though, since now both deserialize methods are default, moreover the second deserialize method relies on the first one. 2. Instead of having a new deserialize method with the timestamp, we have an `setTimestamp` method that is only ever called in Kafka 0.10+, which signature is `void setTimestamp(T deserializedRecord, long timestamp)`. This might also match better what we've discussed in the JIRA ticket, about having separate interfaces for "deserialization" and "enrichment". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583405#comment-16583405 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210810317 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -54,4 +70,12 @@ * @return True, if the element signals end of stream, false otherwise. */ boolean isEndOfStream(T nextElement); + + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 Review comment: Capital k for Kafka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583406#comment-16583406 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r210809870 ## File path: docs/dev/connectors/kafka.md ## @@ -153,7 +153,10 @@ produced Java/Scala type to Flink's type system. Users that implement a vanilla to implement the `getProducedType(...)` method themselves. For accessing both the key and value of the Kafka message, the `KeyedDeserializationSchema` has -the following deserialize method ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)`. +the following deserialize methods ` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)` and +` T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType)`. +The first exists for backward compatibility reasons, for kafka 0.10+ consumers the second is preferred because it +also gives access to the kafka timestamp. Review comment: Capital 'k' for Kafka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579276#comment-16579276 ] Fred Teunissen commented on FLINK-8500: --- I've rebased this PR with the latest master branch yesterday evening. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496136#comment-16496136 ] yanxiaobin commented on FLINK-8500: --- [~tzulitai] thanks, greate. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496068#comment-16496068 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: [~Backlight] Yes, we're currently going for a minimal incremental fix for now, so that at least we have a way around the problem. In the long run, the ideal approach is discussed in the comments of this PR: [https://github.com/apache/flink/pull/5958.] Given that [~FredTing]'s latest PR is a "short-term solution", I think it is definitely ok to target that for a merge to the 1.5.x series. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496006#comment-16496006 ] yanxiaobin commented on FLINK-8500: --- So far, it can solve the current problems, but in the long run, there will still be some limitations on the support of future Kafka version. By the way, can we fix this problem in the 1.5.x series? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16495589#comment-16495589 ] ASF GitHub Bot commented on FLINK-8500: --- GitHub user FredTing opened a pull request: https://github.com/apache/flink/pull/6105 [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer ## What is the purpose of the change This pull request make the Kafka timestamp and timestampType available in the message deserialisation so one can use it in the business logic processing. ## Brief change log - *added new default method with Timestamp/TimestampType parameters to the interface `KeyedDeserializationSchema` ## Verifying this change This change is already covered by existing tests, such as Kafka Consumer tests and JSONKeyValueDeserializationSchemaTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / no / **don't know**) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/FredTing/flink FLINK-8500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6105.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6105 commit a214481f6722e75a298d87333e4981cb87b9c2b9 Author: Fred Teunissen Date: 2018-05-20T20:45:29Z [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16495198#comment-16495198 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: +1 to proceed with at least an incremental fix to the issue, for now. The discussion of breaking up concerns of deserialization / meta info enrichment into 2 interfaces can go into a separate thread. For a minimal incremental fix for now, I would prefer this approach: {code} default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException {code} I think in the long run, we'll still very likely break this up into two interfaces, > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493109#comment-16493109 ] Fred Teunissen commented on FLINK-8500: --- @[~aljoscha], [~tzulitai], If you like, I can make the PR with a new default {{deserialize}} method on the interface {{KeyedDeserializationSchema}} with either the signature {{default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException ...}} or the signature {{default T deserialize(ConsumerRecord record) throws IOException ...}} I have no preference, both have their pros and cons. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488779#comment-16488779 ] yanxiaobin commented on FLINK-8500: --- I prefer to to expose Kafka's ConsumerRecord directly(DeserializationSchema), thus ensuring the flexibility of the interface, otherwise there will always be various limitations. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481874#comment-16481874 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on the issue: https://github.com/apache/flink/pull/5958 Closing this PR because new insights > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481875#comment-16481875 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing closed the pull request at: https://github.com/apache/flink/pull/5958 > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481515#comment-16481515 ] Fred Teunissen commented on FLINK-8500: --- I agree that 'separation of concerns' is a good principle, but in this situation I have a problem with it. The interface DeserializationSchema is responsible for deserialisation of the message, but in our case the messages are encrypted and every topic has its own decryption key. So we need at least the topic name during deserialisation, so currently we are using the interface KeyedDeserializationSchema. I don't think this is kafka specific, so it should be solved in the "common deserialization schema" interface. Another concern I have is that by having a two-step approach for creating the records from kafka messages, you get more memory allocations and thus more garbage collection cycles. Like [~StephanEwen] said, this issue can be fixed for now by introducing a new default method on the interface KeyedDeserializationSchema, but for the long run a new `common connector framework` should be designed and implemented. This new `common connector framework` should also address some other issues like 'temporarily idleness of partitions', 'customisable partition assignments' . When you agree, I can make a new PR with the new default method on the interface KeyedDeserializationSchema. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480505#comment-16480505 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 @FredTing we had some offline discussion on how to proceed with this. @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I missed anything. The conflict that Stephan mentioned between a "common deserialization schema" interface and exposing surfacing connector specific information is rooted in the fact that both concerns (deserialization and providing connector specific record meta information) is currently coupled in a single interface. Take for example the Kafka connector's `KeyedDeserializationSchema` - there we try to deserialize the Kafka bytes, as well as provide information such as topic / partition / timestamp etc. to allow the user to enrich their user records for downstream business logic. The first part (deserialization of bytes) should be something common for all connector sources, while the second part is Kafka-specific. Therefore, we should perhaps break this up into two separate interfaces, as follows: ``` // common interface for all sources (we already have this) interface DeserializationSchema { T deserialize(byte[] bytes); } // ... and a Kafka-specific interface that is only used to provide record meta information interface ConsumerRecordMetaInfoProvider { T enrich(T record, ConsumerRecordMetaInfo metaInfo); } ``` The second interface is something that each connector should have independently, and does not handle deserialization of the record bytes. The name, of course, is still open to discussion. What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471756#comment-16471756 ] ASF GitHub Bot commented on FLINK-8500: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5958 If it turns out that we need to do a bit more design work on the deserialization schema, we could incrementally fix the issue that triggered this PR by actually extending the KeyedDeserializationSchema with a new default method. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471754#comment-16471754 ] ASF GitHub Bot commented on FLINK-8500: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5958 There are a few bigger design aspects that we need to agree upon: - The `DeserializationSchema` is a shared common denominator of serialization schemata. That's why it is in `flink-core` and not in a Kafka connector project. It is used by various per-record streaming sources, like Kafka, RabbitMQ, in the future PubSub, or AMQ. It may be valuable to migrate Kinesis also to that. This PR changes the common denominator to have very Kafka-specific fields. - The common schema makes sense because we can offer a library of implementations, like for Avro, Json, Thrift, Protobuf. All connectors can use hardened implementations for these types, or integrations with schema registries. - This surfaces for example in the SQL / Table API, which is currently making an effort to change their source API to have "connector" and "format" aspects orthogonal. You define a table as "from kafka with Avro", or "from row-wise file with JSON", etc. - We should think of having something similar in the future in the unified batch/streaming DataStream API as well, when we rework our source interface. At least a "row-wise source" that can then use all these format implementations. That means we are in a bit of a conflict between "common deserialization schema" interface and surfacing connector specific information. One way to approach that might be making the connector-specific deserializer classes subclasses of the common one, and let them use specialized subclasses of ConsumerRecordMetaInfo that have the additional fields. On a separate note, I think that `ConsumerRecordMetaInfo` is not the best name, because the type has not only the meta info, but the actual record. So we could call it `Record` or `Datum` or `SourceRecord`, etc. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468546#comment-16468546 ] ASF GitHub Bot commented on FLINK-8500: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186967673 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- I would also vote for deprecating those classes and creating a specific version `KafkaDeserializationSchema`/`KafkaSerializationSchema`. I would also like to add a corresponding option to `SerializationSchema` to pass the targetTopic down, e.g. to be able to lookup appropriate schema in SchemaRegistry. I think changes like those does not fit well into a common space. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466795#comment-16466795 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186606104 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- Makes sense. Alright, lets leave this as is then. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466797#comment-16466797 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 Thanks for the update @FredTing. I'll try to take another look at the PR within the next days. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466166#comment-16466166 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186480680 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME + } + + /** +* @return the key as a byte array (null if no key has been set). +*/ + byte[] getKey(); + + /** +* @return The message, as a byte array (null if the message was empty or deleted). +*/ + byte[] getMessage(); + + /** +* @return The topic the message has originated from (for example the Kafka topic). +*/ + String getTopic(); + + /** +* @return The partition the message has originated from (for example the Kafka partition). +*/ + int getPartition(); + + /** +* @return the offset of the message in the original source (for example the Kafka offset). +*/ + long getOffset(); + + /** +* @return the timestamp of the consumer record --- End diff -- I've added some more comments > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466157#comment-16466157 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186479633 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- I added some more text to the javadoc explaining that implementing the other `deserialize` method has more benefits. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466141#comment-16466141 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186479026 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- I agree that it's probably better to make separate `DeserializationSchema` classes, for each connector type. But for now I think this is a relative easy fix without breaking the Flink API for the custom deserializers. There is already some discussion about redesigning the connectors (see issue 5479) with a `common connector framework` in mind. I think that would be a good place to decide what to do with a shared `DeserializationSchema`. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466062#comment-16466062 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186465223 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- I'll rename them both. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466047#comment-16466047 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186462871 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -78,6 +79,69 @@ public Kafka010Fetcher( useMetrics); } + private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo { + private static final long serialVersionUID = 2651665280744549935L; + + private final ConsumerRecordconsumerRecord; + + public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) { + this.consumerRecord = consumerRecord; + } + + @Override + public byte[] getKey() { + return consumerRecord.key(); + } + + @Override + public byte[] getMessage() { + return consumerRecord.value(); + } + + @Override + public String getTopic() { + return consumerRecord.topic(); + } + + @Override + public int getPartition() { + return consumerRecord.partition(); + } + + @Override + public long getOffset() { + return consumerRecord.offset(); + } + + @Override + public long getTimestamp() { + return Long.MIN_VALUE; --- End diff -- Yes, it certainly does, it should return the `consumerRecord.timestamp()`. I'll fix it. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465498#comment-16465498 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186338049 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- For the deprecation, I would recommend explaining why the new deserialize method is more superior. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465499#comment-16465499 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186338721 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . +*/ + @Deprecated + T deserialize(byte[] message) throws IOException; + /** * Deserializes the byte message. * -* @param message The message, as a byte array. +* @param consumerRecordMetaInfossage The message, as a {@link ConsumerRecordMetaInfo}. * * @return The deserialized message as an object (null if the message cannot be deserialized). */ - T deserialize(byte[] message) throws IOException; + default T deserialize(ConsumerRecordMetaInfo consumerRecordMetaInfossage) throws IOException { --- End diff -- I'm actually not sure that we should continue using this class, for the following reasons: 1. The class is actually placed under a non-ideal package: `o.a.f.api.common.serialization`, whereas is should be placed under some `o.a.f.connectors.kafka`. The reason it is currently placed under this package was because the `DeserializationSchema` was initially intended to be commonly used by all connectors. However, over time, things have proven that each connector will benefit from their own version of a schema class. So, it actually might make sense to deprecate the whole `DeserializationSchema` class now, and have a new class (maybe called `KafkaDeserializationSchema` / `KafkaSerializationSchema`) under a correct Kafka package. What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465496#comment-16465496 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337301 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java --- @@ -78,6 +79,69 @@ public Kafka010Fetcher( useMetrics); } + private class KafkaConsumerRecordWrapper10 implements ConsumerRecordMetaInfo { + private static final long serialVersionUID = 2651665280744549935L; + + private final ConsumerRecordconsumerRecord; + + public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) { + this.consumerRecord = consumerRecord; + } + + @Override + public byte[] getKey() { + return consumerRecord.key(); + } + + @Override + public byte[] getMessage() { + return consumerRecord.value(); + } + + @Override + public String getTopic() { + return consumerRecord.topic(); + } + + @Override + public int getPartition() { + return consumerRecord.partition(); + } + + @Override + public long getOffset() { + return consumerRecord.offset(); + } + + @Override + public long getTimestamp() { + return Long.MIN_VALUE; --- End diff -- Doesn't Kafka 0.10 support record timestamps? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465497#comment-16465497 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337890 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- `NO_TIMESTAMP_TYPE` --> maybe `NO_TIMESTAMP` will do, since from the enum name we already know it is a type. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465500#comment-16465500 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337736 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME --- End diff -- I wonder if `CREATE_TIME` should be renamed as `EVENT_TIME`, to be more coherent with Flink's terminologies. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465501#comment-16465501 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186338002 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java --- @@ -42,14 +42,22 @@ @Public public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + /** +* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} . --- End diff -- Unnecessary space before period at the end. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465495#comment-16465495 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5958#discussion_r186337834 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +/** + * The consumer record meta info contains, besides the actual message, some meta information, such as + * key, topic, partition, offset and timestamp for Apache kafka + * + * Note:The timestamp is only valid for Kafka clients 0.10+, for older versions the value has the value `Long.MinValue` and + * the timestampType has the value `NO_TIMESTAMP_TYPE`. + */ +@Public +public interface ConsumerRecordMetaInfo { + /** +* The TimestampType is introduced in the kafka clients 0.10+. This interface is also used for the Kafka connector 0.9 +* so a local enumeration is needed. +*/ + enum TimestampType { + NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME + } + + /** +* @return the key as a byte array (null if no key has been set). +*/ + byte[] getKey(); + + /** +* @return The message, as a byte array (null if the message was empty or deleted). +*/ + byte[] getMessage(); + + /** +* @return The topic the message has originated from (for example the Kafka topic). +*/ + String getTopic(); + + /** +* @return The partition the message has originated from (for example the Kafka partition). +*/ + int getPartition(); + + /** +* @return the offset of the message in the original source (for example the Kafka offset). +*/ + long getOffset(); + + /** +* @return the timestamp of the consumer record --- End diff -- Javadoc should educate the "dummy" timestamp value if timestamp type is `NO_TIMESTAMP`. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465456#comment-16465456 ] ASF GitHub Bot commented on FLINK-8500: --- GitHub user FredTing opened a pull request: https://github.com/apache/flink/pull/5958 [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer ## What is the purpose of the change This pull request make the Kafka timestamp and timestampType available in the message deserialisation so one can use it in the business logic processing. ## Brief change log Introduced new interface `ConsumerRecordMetaInfo` with meta info of the kafka message Extended the `DeserializationSchema` with the `T deserialize(ConsumerRecordMetaInfo consumerRecord)` method. Adjusted the Kafka Connectors to support the new interface. Added some documentation. ## Verifying this change This change is already covered by existing tests, such as most of the Kafka Consumer tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (**yes** / no / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/FredTing/flink FLINK-8500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5958.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5958 commit dca3e7bf5504fdeb929cb38bdd05c3fcec184d6c Author: Fred TeunissenDate: 2018-05-06T15:31:15Z [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463605#comment-16463605 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing commented on the issue: https://github.com/apache/flink/pull/5939 I'm closing this Pull Request and will make a new one with the newer approach as discussed in the jira issue > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463606#comment-16463606 ] ASF GitHub Bot commented on FLINK-8500: --- Github user FredTing closed the pull request at: https://github.com/apache/flink/pull/5939 > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463516#comment-16463516 ] Aljoscha Krettek commented on FLINK-8500: - I think most people agree on adding a {{ConsumerRecordMetaInfo}} interface. We can probably remodel {{DeserializationSchema}} for our purposes, i.e. deprecate the existing method, introduce the new method that takes the meta info and mark them as default. [~FredTing] You can have a look at {{SinkFunction}}, there I did the same trick. You can either open a new PR or modify the existing one. Creating a new one is probably clearer. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462345#comment-16462345 ] Fred Teunissen commented on FLINK-8500: --- [~aljoscha] I have some time that I can spend on this, so yes I'm interested. I can also update the current PR with the changes unless you think it's better to make a new fresh PR for this. There are now 3 possible approaches. # Kafka {{ConsumerRecord}} as parameter in the Flink API # a new {{ConsumerRecordMetaInfo}} class as parameter for the {{deserialize}} method # extend the interface {{KeyedDeserializationSchema}} with a new method, although I don't know how to give an interface default behavior in java. I'm leaning to almost the same approach I used now, creating a new separate interface {{RichDeserializationSchema}} with a {{deserialize}} method with a {{ConsumerRecordMetaInfo}} parameter. Also create a {{RichDeserializationSchemaWrapper}} for implementing the current API for backwards compatibility Do you think this is the right approach? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462285#comment-16462285 ] Aljoscha Krettek commented on FLINK-8500: - [~FredTing] Would you be interested in contributing this slightly more complex addition? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462283#comment-16462283 ] ASF GitHub Bot commented on FLINK-8500: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5939 @FredTing Do avoid confusion we should probably close this PR. What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462148#comment-16462148 ] ASF GitHub Bot commented on FLINK-8500: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5939 @StephanEwen we have some follow-up discussion on how to approach this on the JIRA ticket. Do you want to take a look at that? https://issues.apache.org/jira/browse/FLINK-8500 The general idea is that we should perhaps come up with a more generic schema interface that allows providing all information that Kafka gives us and provide them to the user. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462143#comment-16462143 ] ASF GitHub Bot commented on FLINK-8500: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5939 The feature is a nice addition. Flink currently already adds the timestamp as the record's event time timestamp. You can access it via a ProcessFunction. That is a tad bit more clumsy, though... If we want to have the timestamp as part of the Deserialization Schema, I would suggest to not add yet another specialized schema, but extend the KeyedDeserializationSchema with another method that takes the timestamp. We should make that a default method that calls the existing method and make the existing method an empty default method. We could also think about renaming `KeyedDeserializationSchema` to `RichDeserializationSchema` or so, if that would describe the functionality better (I am not a native speaker, so would be nice for one to give their opinion here). > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461960#comment-16461960 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: I'm still leaning towards the second approach, where we maintain our own class. [~FredTing] yes, maybe we should discuss those as a separate issue, and just focus on Kafka for this ticket. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461955#comment-16461955 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: Let's go with the second approach. I've quickly checked Kafka 1.x, it seems like the \{{ConsumerRecord}} class will remain stable, at least for the near future. [~FredTing] yes, for example the Kinesis deserialization schema could benefit from a more generic solution. And yes, lets discuss them elsewhere / have a separate Jira for them. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461953#comment-16461953 ] Fred Teunissen commented on FLINK-8500: --- Both approaches will work, but we have to choose. The first approach exposes the Kafka API as part of the Flink API. The second approach hides the Kafka API but will require a bit more resources to maintain. The second approach would have my vote. I don’t want to introduce scope creep, but I think there are more input sources that could benefit from a more generic (de)serialization scheme. Should we look into that, or leave it for now (in issues 5479 the idea of a `common connector framework` is mentioned, should it be picked up there)? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460879#comment-16460879 ] Aljoscha Krettek commented on FLINK-8500: - [~tzulitai] I thought about that second approach as well but didn't want to mention it because it requires work by us when there are new Kafka features. I'd be fine with either approach, though. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460871#comment-16460871 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: +1 to the general idea; simplifying the schema methods and unifying the keyed / non-keyed versions seems very appealing. However, I suggest not to expose Kafka's {{ConsumerRecord}} directly, otherwise our API compatibility will be dependent on Kafka changing this class. This might be ok, since users should expect that API changes across major Kafka connector versions anyways. This would also entail that we might eventually have version-specific serialization schema classes, instead of a base one. Another approach, if we want to avoid this, is to have our own `ConsumerRecordMetaInfo` class that we pass via the serialization schema. Downside would be that we have to make sure to expose everything in Kafka's `ConsumerRecord` to our own class. It's a bit more manual, but should be manageable in a non-API breaking way (whenever we have to add more information to that class). I more leaning towards the second approach. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460869#comment-16460869 ] Aljoscha Krettek commented on FLINK-8500: - Thanks for opening the PR and pushing the discussion! I think we have to take a step back, though, and reconsider the design of the deserialisation schemes. I don't think we can keep adding ever more schemes when new features come up. I think a simple fix for that is to introduce a deserialisation scheme that accepts a Kafka {{ConsumerRecord}}, all the information that you could ever want to access has to be in there, since it's the only thing we get from Kafka. This would also greatly simply the methods and would unify keyed/non-keyed schemes. [~tzulitai] What do you think about this? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457600#comment-16457600 ] Fred Teunissen commented on FLINK-8500: --- I've created a [pull request|https://github.com/apache/flink/pull/5939] for this issue. This is my first pull request, so I hope that I addressed all of the 'contribution code guidelines' correctly. Please let me know whether I should do something different or when I forgot something. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457587#comment-16457587 ] ASF GitHub Bot commented on FLINK-8500: --- GitHub user FredTing opened a pull request: https://github.com/apache/flink/pull/5939 [FLINK-8500] [Kafka Connector] Get the timestamp of the Kafka message from kafka consumer ## What is the purpose of the change This pull request make the Kafka timestamp and timestampType available in the message deserialisation so one can use it in the business logic processing. ## Brief change log - Introduced new interface `KeyedWithTimestampDeserializationSchema` with extra parameters in the `deserialize` method. - Added the `KeyedWithTimestampDeserializationSchemaWrapper` class to keep the code backwards compatible. - Adjusted the Kafka Connectors 0.10+ to support the new interface too. - Adjusted the Kafka Connectors 0.9- to 'hide' this new interface since these version of Kafka don';t support timestamps - Added some documentation. ## Verifying this change This change is already covered by existing tests, such as most of the the Kafka Consumer tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs / JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/FredTing/flink FLINK-8500 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5939 commit 30293ac49a1d31c2abfa2b3fb3640e9e04ef8bcf Author: Fred TeunissenDate: 2018-04-28T08:06:41Z [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451808#comment-16451808 ] yanxiaobin commented on FLINK-8500: --- hi [~aljoscha] :What is your idea? I now solve the problem by adjusting the source code, but it is very inconvenient for future upgrade. I hope it can be resolved in the new distribution as soon as possible so that I do not have to maintain another set of code myself. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447144#comment-16447144 ] Fred Teunissen commented on FLINK-8500: --- We want to use the kafka message timestamp in our business logic, so we would like to have this timestamp available during the deserialisation of the message. But we also need the TimestampType to be able to detect from whom the timestamp came (the producer or the kafka broker). If we don't want to break the interface we can make a new interface {{KeyedAndTimestampDeserializationSchema}} extending the interface {{KeyedDeserializationSchema}} and add a {{KeyedAndTimestampDeserializationSchemaWrapper}} that accepts a {{KeyedDeserializationSchema}}. The FlinkKafkaConsumers constructors can be extended to accept the new {{KeyedAndTimestampDeserializationSchema}} as parameter and wrap the calls with the {{KeyedDeserializationSchema}} interface. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390914#comment-16390914 ] yanxiaobin commented on FLINK-8500: --- Sorry to reply so late. I see. We really should be careful here. I want to find out how I should get Kafka timestamp in like flatmap and map methods of datastream. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377145#comment-16377145 ] Aljoscha Krettek commented on FLINK-8500: - Yes, I still think that we need a solution for this, that's why I kept the issue open. However, adding a parameter to that interface will break all existing code, that's why we have to be careful here. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376428#comment-16376428 ] yanxiaobin commented on FLINK-8500: --- hi, [~aljoscha] ! I can solve that problem like that,but I still think the method deserialize of KeyedDeserializationSchema needs a parameter 'kafka message timestamp' (from ConsumerRecord), It is possible that this timestamp will also be used in some business logic processing!What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369081#comment-16369081 ] Aljoscha Krettek commented on FLINK-8500: - I've moved this to 1.6 because we're already very late with the feature freeze for 1.5. There exists a workaround of writing a custom timestamp extractor that is not derived from {{AscendingTimestampExtractor}} or {{BoundedOutOfOrdernessTimestampExtractor}}, there you can implement the method {{extractTimestamp(T, long)}} which has access to the current timestamp, which is the Kafka timestamp in case of using the Kafka source. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357295#comment-16357295 ] Tzu-Li (Gordon) Tai commented on FLINK-8500: Moving this to 1.4.2, since on the mailing lists the community agreed to move forward with what we have already for 1.4.1. Please reopen and let me know if you disagree. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347951#comment-16347951 ] yanxiaobin commented on FLINK-8500: --- Greate!I think it's reasonable.But I think it's best to optimize it. The user interface is as unified as possible! thanks , [~aljoscha] . > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347008#comment-16347008 ] Aljoscha Krettek commented on FLINK-8500: - Yes, no I feel stupid. You analysed it exactly right: we do get the timestamp from Kafka but then all of the builtin timestamp extractors ({{AscendingTimestampExtractor}} and {{BoundedOutOfOrdernessTimestampExtractor}}) don't take that into account and overwrite it. You can get around that by writing a completely custom {{AssignerWithPeriodicWatermarks}} but it's not a good situation. > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346550#comment-16346550 ] yanxiaobin commented on FLINK-8500: --- hi, [~aljoscha] . What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346180#comment-16346180 ] yanxiaobin commented on FLINK-8500: --- hi,[~aljoscha] , thank you for your reply!Please look at the next picture! !image-2018-01-31-10-48-59-633.png! The final eventtime is obtained from “{color:#80}final long {color}newTimestamp = extractAscendingTimestamp(element);“ , and the element was deserialized from "KeyedDeserializationSchema" . Also the parameter "elementPrevTimestamp" is not used! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344858#comment-16344858 ] Aljoscha Krettek commented on FLINK-8500: - Yes, the Kafka timestamp is already used as the event-time timestamp. See here: https://github.com/apache/flink/blob/28e8043ba09b47c99439fdb536a4226eccf70c07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java#L85 > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344591#comment-16344591 ] yanxiaobin commented on FLINK-8500: --- As shown below(Kafka010Fetcher): !image-2018-01-30-14-58-58-167.png! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Attachments: image-2018-01-30-14-58-58-167.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338958#comment-16338958 ] yanxiaobin commented on FLINK-8500: --- I mean that I want to use Kafka message timestamp as event time! Thanks! > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337598#comment-16337598 ] Aljoscha Krettek commented on FLINK-8500: - The timestamp of Kafka messages is attached to elements. You can, for example, access that using {{Context.timestamp()}} when using a {{ProcessFunction}}. Is that good enough for your purposes? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)