[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-10-18 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656137#comment-16656137
 ] 

vinoyang commented on FLINK-8500:
-

Hi,  if the modern kafka connector also has this issue, Shall we remove 
*Kafka010Fetcher* from the title of the issue?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-09-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602584#comment-16602584
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the 
Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#issuecomment-418235101
 
 
   FYI, I have added deserialize with “Record” parameter to my PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597466#comment-16597466
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the 
Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#issuecomment-417324038
 
 
   @aljoscha, yes, I can do it. Should we still try to preserve compatibility 
with existing interface using _default_ method or just break it for good, what 
do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16597240#comment-16597240
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

aljoscha commented on issue #6105: [FLINK-8500] Get the timestamp of the Kafka 
message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#issuecomment-417254164
 
 
   @alexeyt820 Yes, we considered that also but were reluctant because it's a 
bigger change. But now I think that we probably have to do it. Do you want to 
try that out in your PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596715#comment-16596715
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

alexeyt820 commented on issue #6105: [FLINK-8500] Get the timestamp of the 
Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#issuecomment-417063770
 
 
   @aljoscha, would be even better to have parameter object sort of wrapping 
Kafka ConsumerRecord instead of flat list of arguments (messageKey, message, 
topic, partition, offset, timestamp, headers,...) and pass it to deserialize? 
It feels more extensible approach - any futher changes in parameter object 
seems transparent for clients


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590215#comment-16590215
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212309444
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I've pushed the changes as we discussed above.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590017#comment-16590017
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212252904
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I would be ok with proceeding with the above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589904#comment-16589904
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r21385
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   Yes, to unblock this I thing we can go with this approach, basically the 
schema becomes this:
   ```
   @PublicEvolving
   public interface KeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
   
   @Deprecated
default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset) {
   throw new RuntimeException("blammo");
   }
   
   default T deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset, long timestamp) {
   return deserialize(/* call the other method */);
   } 
boolean isEndOfStream(T nextElement);
   }
   ```
   
   With this, if you have an existing implementation of 
`KeyedDeserializationSchema` it will continue to work without any changes. If 
you implement a new one you have to implement one of the methods, otherwise the 
exception is thrown. And all Flink code only calls the version that takes the 
timestamp.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589289#comment-16589289
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212082669
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   @tzulitai We can remove the non-timestamped version of the `deserialize` 
method completely. It will break the interface and if you implemented this 
method it quite easy to migrate to the new method. I've only introduced the new 
`deserialize` method as default to make this change backwards compatible.
   When backwards compatibility is required we could mark the non-timestamped 
version of the `deserialize` method as `@Deprecated` and give it a default 
implementation that throws a `NotImplementedException` or a 
`FlinkRuntimeException` with the message "you must implement the deserialize 
method when implementingt the KeyedDeserializationSchema interface"   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587192#comment-16587192
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r211532678
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I'm still not very convinced that this is a good approach.
   
   For example, things might be a lot cleaner and error-proof if we can just 
deprecate the non-timestamped `KeyedSerializationSchema` in favor of a new 
interface. Technically, we're trying to achieve the same thing, by things would 
perhaps be much understandable. 
   
   @FredTing what do you think?
   Perhaps @aljoscha would also want to chime in some thoughts here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585692#comment-16585692
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r211197298
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   @tzulitai Yes, this would also be an issue with our long-term solution, and 
we should look at that carefully. For instance, when messages are encrypted on 
Kafka we need them to be decrypted. How stuff is encrypted is determined by the 
producer (out of our control), but we should be able to decrypt these message. 
When for encryption some kafka-meta information is used we need this during 
decryption too.
   
   > I'm not sure how this would work. Could you elaborate a bit more on this?
   
   When a custom implementation of the `KeyedDeserializationSchema` is created, 
then at least one of the `deserialize` methods must be implement (why else 
create a custom implementation?). When both `deserialize` methods have a 
default implementation the compiler isn't complaining anymore if you forget to 
implement it. When it is an empty method, you won't get complains during 
runtime either, but you won't receive any record too. By throwing an exception 
with a clear message that you forgot to override either one of the 
`deserialize` methods, you get a error message during runtime.
   When either one of the `deserialize` method is overridden the default with 
the exception isn't called any more. 
   Since it's now an abstract method, al current implementations already 
override this `deserialize` method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585659#comment-16585659
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r211187832
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   @FredTing Your first concern makes sense. Though, wouldn't this be a 
shortcoming also for our long-term solution, where we want to have separate 
interfaces for "deserialization" and "enrichment"?
   
   ```
   I have no problem with the first alternative, but I think we are better of 
when we throw an exception with a message explaining that must 
implement/override one of the deserialize methods.
   ```
   
   I'm not sure how this would work. Could you elaborate a bit more on this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583865#comment-16583865
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210898792
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I have a small problem with the second alternative. When I implement this 
interface in Scala, I do not want to use mutable objectS. I think the 
`setTimestamp` method is forcing to make the `T deserializedRecord` mutable 
(for at least the timestamp field)
   
   I have no problem with the first alternative, but I think we are better of 
when we throw an exception with a message explaining that must 
implement/override one of the `deserialize`  methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583407#comment-16583407
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210810967
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   This would mean that, if users want to access timestamps, they would have to 
implement both deserialize methods, correct?
   
   This feels a bit awkward. I can think of 2 possible approaches to address 
this:
   1. Let the original non-timestamped deserialize method also be default, but 
have an  empty method. It's still not nice though, since now both deserialize 
methods are default, moreover the second deserialize method relies on the first 
one.
   
   2. Instead of having a new deserialize method with the timestamp, we have an 
`setTimestamp` method that is only ever called in Kafka 0.10+, which signature 
is `void setTimestamp(T deserializedRecord, long timestamp)`. This might also 
match better what we've discussed in the JIRA ticket, about having separate 
interfaces for "deserialization" and "enrichment".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583405#comment-16583405
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210810317
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -54,4 +70,12 @@
 * @return True, if the element signals end of stream, false otherwise.
 */
boolean isEndOfStream(T nextElement);
+
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
 
 Review comment:
   Capital k for Kafka


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16583406#comment-16583406
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210809870
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -153,7 +153,10 @@ produced Java/Scala type to Flink's type system. Users 
that implement a vanilla
 to implement the `getProducedType(...)` method themselves.
 
 For accessing both the key and value of the Kafka message, the 
`KeyedDeserializationSchema` has
-the following deserialize method ` T deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset)`.
+the following deserialize methods ` T deserialize(byte[] messageKey, byte[] 
message, String topic, int partition, long offset)` and 
+` T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset, long timestamp, TimestampType timestampType)`. 
+The first exists for backward compatibility reasons, for kafka 0.10+ consumers 
the second is preferred because it 
+also gives access to the kafka timestamp.
 
 Review comment:
   Capital 'k' for Kafka


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-13 Thread Fred Teunissen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579276#comment-16579276
 ] 

Fred Teunissen commented on FLINK-8500:
---

I've rebased this PR with the latest master branch yesterday evening.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496136#comment-16496136
 ] 

yanxiaobin commented on FLINK-8500:
---

[~tzulitai]   thanks, greate. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496068#comment-16496068
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


[~Backlight]

Yes, we're currently going for a minimal incremental fix for now, so that at 
least we have a way around the problem.

In the long run, the ideal approach is discussed in the comments of this PR: 
[https://github.com/apache/flink/pull/5958.]

Given that [~FredTing]'s latest PR is a "short-term solution", I think it is 
definitely ok to target that for a merge to the 1.5.x series.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread yanxiaobin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496006#comment-16496006
 ] 

yanxiaobin commented on FLINK-8500:
---

So far, it can solve the current problems, but in the long run, there will 
still be some limitations on the support of future Kafka version. By the way, 
can we fix this problem in the 1.5.x series?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16495589#comment-16495589
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

GitHub user FredTing opened a pull request:

https://github.com/apache/flink/pull/6105

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

## What is the purpose of the change

This pull request make the Kafka timestamp and timestampType available in 
the message deserialisation so one can use it in the business logic processing.

## Brief change log
- *added new default method with Timestamp/TimestampType parameters to the 
interface `KeyedDeserializationSchema`

## Verifying this change

This change is already covered by existing tests, such as Kafka Consumer 
tests and JSONKeyValueDeserializationSchemaTest.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / no / **don't know**)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6105


commit a214481f6722e75a298d87333e4981cb87b9c2b9
Author: Fred Teunissen 
Date:   2018-05-20T20:45:29Z

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer




> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-30 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16495198#comment-16495198
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


+1 to proceed with at least an incremental fix to the issue, for now. The 
discussion of breaking up concerns of deserialization / meta info enrichment 
into 2 interfaces can go into a separate thread.

For a minimal incremental fix for now, I would prefer this approach:
{code}
default T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException
{code}

I think in the long run, we'll still very likely break this up into two 
interfaces,

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-28 Thread Fred Teunissen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493109#comment-16493109
 ] 

Fred Teunissen commented on FLINK-8500:
---

@[~aljoscha], [~tzulitai], If you like, I can make the PR with a new default 
{{deserialize}} method on the interface {{KeyedDeserializationSchema}} with 
either the signature
 {{default T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException ...}}
 or the signature
 {{default T deserialize(ConsumerRecord record) throws 
IOException ...}}

I have no preference, both have their pros and cons.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-24 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488779#comment-16488779
 ] 

yanxiaobin commented on FLINK-8500:
---

I prefer to  to expose Kafka's ConsumerRecord directly(DeserializationSchema), 
thus ensuring the flexibility of the interface, otherwise there will always be 
various limitations.

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481874#comment-16481874
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on the issue:

https://github.com/apache/flink/pull/5958
  
Closing this PR because new insights


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481875#comment-16481875
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing closed the pull request at:

https://github.com/apache/flink/pull/5958


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-19 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481515#comment-16481515
 ] 

Fred Teunissen commented on FLINK-8500:
---

I agree that 'separation of concerns' is a good principle, but in this 
situation I have a problem with it. The interface DeserializationSchema is 
responsible for deserialisation of the message, but in our case the messages 
are encrypted and every topic has its own decryption key. So we need at least 
the topic name during deserialisation, so currently we are using the interface 
KeyedDeserializationSchema. I don't think this is kafka specific, so it 
should be solved in the "common deserialization schema" interface.  

Another concern I have is that by having a two-step approach for creating the 
records from kafka messages, you get more memory allocations and thus more 
garbage collection cycles.

Like [~StephanEwen] said, this issue can be fixed for now by introducing a new 
default method on the interface KeyedDeserializationSchema, but for the long 
run a new `common connector framework` should be designed and implemented. This 
new `common connector framework` should also address some other issues like 
'temporarily idleness of partitions', 'customisable partition assignments' .

When you agree, I can make a new PR with the new default method on the 
interface KeyedDeserializationSchema.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480505#comment-16480505
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
@FredTing we had some offline discussion on how to proceed with this.
@aljoscha, @twalthr, or @StephanEwen can probably comment more here if I 
missed anything.

The conflict that Stephan mentioned between a "common deserialization 
schema" interface and exposing surfacing connector specific information is 
rooted in the fact that both concerns (deserialization and providing connector 
specific record meta information) is currently coupled in a single interface.

Take for example the Kafka connector's `KeyedDeserializationSchema` - there 
we try to deserialize the Kafka bytes, as well as provide information such as 
topic / partition / timestamp etc. to allow the user to enrich their user 
records for downstream business logic. The first part (deserialization of 
bytes) should be something common for all connector sources, while the second 
part is Kafka-specific.

Therefore, we should perhaps break this up into two separate interfaces, as 
follows:
```
// common interface for all sources (we already have this)
interface DeserializationSchema {
T deserialize(byte[] bytes);
}

// ... and a Kafka-specific interface that is only used to provide record 
meta information
interface ConsumerRecordMetaInfoProvider {
T enrich(T record, ConsumerRecordMetaInfo metaInfo);
}
```

The second interface is something that each connector should have 
independently, and does not handle deserialization of the record bytes. The 
name, of course, is still open to discussion.

What do you think?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471756#comment-16471756
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5958
  
If it turns out that we need to do a bit more design work on the 
deserialization schema, we could incrementally fix the issue that triggered 
this PR by actually extending the KeyedDeserializationSchema with a new default 
method.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471754#comment-16471754
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5958
  
There are a few bigger design aspects that we need to agree upon:

  - The `DeserializationSchema` is a shared common denominator of 
serialization schemata. That's why it is in `flink-core` and not in a Kafka 
connector project. It is used by various per-record streaming sources, like 
Kafka, RabbitMQ, in the future PubSub, or AMQ. It may be valuable to migrate 
Kinesis also to that. This PR changes the common denominator to have very 
Kafka-specific fields.

  - The common schema makes sense because we can offer a library of 
implementations, like for Avro, Json, Thrift, Protobuf. All connectors can use 
hardened implementations for these types, or integrations with schema 
registries.

  - This surfaces for example in the SQL / Table API, which is currently 
making an effort to change their source API to have "connector" and "format" 
aspects orthogonal. You define a table as "from kafka with Avro", or "from 
row-wise file with JSON", etc.

  - We should think of having something similar in the future in the 
unified batch/streaming DataStream API as well, when we rework our source 
interface. At least a "row-wise source" that can then use all these format 
implementations.

That means we are in a bit of a conflict between "common deserialization 
schema" interface and surfacing connector specific information.
One way to approach that might be making the connector-specific 
deserializer classes subclasses of the common one, and let them use specialized 
subclasses of ConsumerRecordMetaInfo that have the additional fields.

On a separate note, I think that `ConsumerRecordMetaInfo` is not the best 
name, because the type has not only the meta info, but the actual record. So we 
could call it `Record` or `Datum` or `SourceRecord`, etc.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468546#comment-16468546
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186967673
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

I would also vote for deprecating those classes and creating a specific 
version `KafkaDeserializationSchema`/`KafkaSerializationSchema`. I would also 
like to add a corresponding option to `SerializationSchema` to pass the 
targetTopic down, e.g. to be able to lookup appropriate schema in 
SchemaRegistry.

I think changes like those does not fit well into a common space.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466795#comment-16466795
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186606104
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

Makes sense. Alright, lets leave this as is then.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466797#comment-16466797
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
Thanks for the update @FredTing.
I'll try to take another look at the PR within the next days.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466166#comment-16466166
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186480680
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
+   }
+
+   /**
+* @return the key as a byte array (null if no key has been set).
+*/
+   byte[] getKey();
+
+   /**
+* @return The message, as a byte array (null if the message was empty 
or deleted).
+*/
+   byte[] getMessage();
+
+   /**
+* @return The topic the message has originated from (for example the 
Kafka topic).
+*/
+   String getTopic();
+
+   /**
+* @return The partition the message has originated from (for example 
the Kafka partition).
+*/
+   int getPartition();
+
+   /**
+* @return the offset of the message in the original source (for 
example the Kafka offset).
+*/
+   long getOffset();
+
+   /**
+* @return the timestamp of the consumer record
--- End diff --

I've added some more comments


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466157#comment-16466157
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186479633
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

I added some more text to the javadoc explaining that implementing the 
other `deserialize` method has more benefits.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466141#comment-16466141
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186479026
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

I agree that it's probably better to make separate `DeserializationSchema` 
classes, for each connector type. But for now I think this is a relative easy 
fix without breaking the Flink API for the custom deserializers. There is 
already some discussion about redesigning the connectors (see issue 5479) with 
a `common connector framework` in mind. I think that would be a good place to 
decide what to do with a shared `DeserializationSchema`.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466062#comment-16466062
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186465223
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

I'll rename them both. 


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466047#comment-16466047
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186462871
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -78,6 +79,69 @@ public Kafka010Fetcher(
useMetrics);
}
 
+   private class KafkaConsumerRecordWrapper10 implements 
ConsumerRecordMetaInfo {
+   private static final long serialVersionUID = 
2651665280744549935L;
+
+   private final ConsumerRecord consumerRecord;
+
+   public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) {
+   this.consumerRecord = consumerRecord;
+   }
+
+   @Override
+   public byte[] getKey() {
+   return consumerRecord.key();
+   }
+
+   @Override
+   public byte[] getMessage() {
+   return consumerRecord.value();
+   }
+
+   @Override
+   public String getTopic() {
+   return consumerRecord.topic();
+   }
+
+   @Override
+   public int getPartition() {
+   return consumerRecord.partition();
+   }
+
+   @Override
+   public long getOffset() {
+   return consumerRecord.offset();
+   }
+
+   @Override
+   public long getTimestamp() {
+   return Long.MIN_VALUE;
--- End diff --

Yes, it certainly does, it should return the `consumerRecord.timestamp()`. 
I'll fix it.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465498#comment-16465498
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186338049
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

For the deprecation, I would recommend explaining why the new deserialize 
method is more superior.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465499#comment-16465499
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186338721
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

I'm actually not sure that we should continue using this class, for the 
following reasons:

1. The class is actually placed under a non-ideal package:
`o.a.f.api.common.serialization`, whereas is should be placed under some 
`o.a.f.connectors.kafka`.
The reason it is currently placed under this package was because the 
`DeserializationSchema` was initially intended to be commonly used by all 
connectors. However, over time, things have proven that each connector will 
benefit from their own version of a schema class.

So, it actually might make sense to deprecate the whole 
`DeserializationSchema` class now, and have a new class (maybe called 
`KafkaDeserializationSchema` / `KafkaSerializationSchema`) under a correct 
Kafka package.

What do you think?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465496#comment-16465496
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337301
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -78,6 +79,69 @@ public Kafka010Fetcher(
useMetrics);
}
 
+   private class KafkaConsumerRecordWrapper10 implements 
ConsumerRecordMetaInfo {
+   private static final long serialVersionUID = 
2651665280744549935L;
+
+   private final ConsumerRecord consumerRecord;
+
+   public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) {
+   this.consumerRecord = consumerRecord;
+   }
+
+   @Override
+   public byte[] getKey() {
+   return consumerRecord.key();
+   }
+
+   @Override
+   public byte[] getMessage() {
+   return consumerRecord.value();
+   }
+
+   @Override
+   public String getTopic() {
+   return consumerRecord.topic();
+   }
+
+   @Override
+   public int getPartition() {
+   return consumerRecord.partition();
+   }
+
+   @Override
+   public long getOffset() {
+   return consumerRecord.offset();
+   }
+
+   @Override
+   public long getTimestamp() {
+   return Long.MIN_VALUE;
--- End diff --

Doesn't Kafka 0.10 support record timestamps?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465497#comment-16465497
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337890
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

`NO_TIMESTAMP_TYPE` --> maybe `NO_TIMESTAMP` will do, since from the enum 
name we already know it is a type.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465500#comment-16465500
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337736
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

I wonder if `CREATE_TIME` should be renamed as `EVENT_TIME`, to be more 
coherent with Flink's terminologies.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465501#comment-16465501
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186338002
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

Unnecessary space before period at the end.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465495#comment-16465495
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337834
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
+   }
+
+   /**
+* @return the key as a byte array (null if no key has been set).
+*/
+   byte[] getKey();
+
+   /**
+* @return The message, as a byte array (null if the message was empty 
or deleted).
+*/
+   byte[] getMessage();
+
+   /**
+* @return The topic the message has originated from (for example the 
Kafka topic).
+*/
+   String getTopic();
+
+   /**
+* @return The partition the message has originated from (for example 
the Kafka partition).
+*/
+   int getPartition();
+
+   /**
+* @return the offset of the message in the original source (for 
example the Kafka offset).
+*/
+   long getOffset();
+
+   /**
+* @return the timestamp of the consumer record
--- End diff --

Javadoc should educate the "dummy" timestamp value if timestamp type is 
`NO_TIMESTAMP`.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465456#comment-16465456
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

GitHub user FredTing opened a pull request:

https://github.com/apache/flink/pull/5958

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

## What is the purpose of the change

This pull request make the Kafka timestamp and timestampType available in 
the message deserialisation so one can use it in the business logic processing.

## Brief change log

Introduced new interface `ConsumerRecordMetaInfo` with meta info of the 
kafka message
Extended the `DeserializationSchema` with the `T 
deserialize(ConsumerRecordMetaInfo consumerRecord)` method.
Adjusted the Kafka Connectors to support the new interface.
Added some documentation.

## Verifying this change

This change is already covered by existing tests, such as most of the Kafka 
Consumer tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (**yes** / 
no / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5958


commit dca3e7bf5504fdeb929cb38bdd05c3fcec184d6c
Author: Fred Teunissen 
Date:   2018-05-06T15:31:15Z

[FLINK-8500] Get the timestamp of the Kafka message from kafka consumer




> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463605#comment-16463605
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on the issue:

https://github.com/apache/flink/pull/5939
  
I'm closing this Pull Request and will make a new one with the newer 
approach as discussed in the jira issue


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463606#comment-16463606
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing closed the pull request at:

https://github.com/apache/flink/pull/5939


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-04 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463516#comment-16463516
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

I think most people agree on adding a {{ConsumerRecordMetaInfo}} interface. We 
can probably remodel {{DeserializationSchema}} for our purposes, i.e. deprecate 
the existing method, introduce the new method that takes the meta info and mark 
them as default.

[~FredTing] You can have a look at {{SinkFunction}}, there I did the same trick.

You can either open a new PR or modify the existing one. Creating a new one is 
probably clearer.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462345#comment-16462345
 ] 

Fred Teunissen commented on FLINK-8500:
---

[~aljoscha] I have some time that I can spend on this, so yes I'm interested. I 
can also update the current PR with the changes unless you think it's better to 
make a new fresh PR for this.

There are now 3 possible approaches.
 # Kafka {{ConsumerRecord}} as parameter in the Flink API
 # a new {{ConsumerRecordMetaInfo}} class as parameter for the {{deserialize}} 
method
 # extend the interface {{KeyedDeserializationSchema}} with a new method, 
although I don't know how to give an interface default behavior in java.

I'm leaning to almost the same approach I used now, creating a new separate 
interface {{RichDeserializationSchema}} with a {{deserialize}} method with a 
{{ConsumerRecordMetaInfo}} parameter. Also create a 
{{RichDeserializationSchemaWrapper}} for implementing the current API for 
backwards compatibility

Do you think this is the right approach?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462285#comment-16462285
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

[~FredTing] Would you be interested in contributing this slightly more complex 
addition?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462283#comment-16462283
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5939
  
@FredTing Do avoid confusion we should probably close this PR. What do you 
think?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462148#comment-16462148
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5939
  
@StephanEwen we have some follow-up discussion on how to approach this on 
the JIRA ticket. Do you want to take a look at that? 
https://issues.apache.org/jira/browse/FLINK-8500

The general idea is that we should perhaps come up with a more generic 
schema interface that allows providing all information that Kafka gives us and 
provide them to the user.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462143#comment-16462143
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5939
  
The feature is a nice addition.

Flink currently already adds the timestamp as the record's event time 
timestamp. You can access it via a ProcessFunction. That is a tad bit more 
clumsy, though...

If we want to have the timestamp as part of the Deserialization Schema, I 
would suggest to not add yet another specialized schema, but extend the 
KeyedDeserializationSchema with another method that takes the timestamp. We 
should make that a default method that calls the existing method and make the 
existing method an empty default method.

We could also think about renaming `KeyedDeserializationSchema` to 
`RichDeserializationSchema` or so, if that would describe the functionality 
better (I am not a native speaker, so would be nice for one to give their 
opinion here).


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461960#comment-16461960
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


I'm still leaning towards the second approach, where we maintain our own class.
[~FredTing] yes, maybe we should discuss those as a separate issue, and just 
focus on Kafka for this ticket.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461955#comment-16461955
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


Let's go with the second approach. I've quickly checked Kafka 1.x, it seems 
like the \{{ConsumerRecord}} class will remain stable, at least for the near 
future.

 

[~FredTing] yes, for example the Kinesis deserialization schema could benefit 
from a more generic solution. And yes, lets discuss them elsewhere / have a 
separate Jira for them.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16461953#comment-16461953
 ] 

Fred Teunissen commented on FLINK-8500:
---

Both approaches will work, but we have to choose. The first approach exposes 
the Kafka API as part of the Flink API. The second approach hides the Kafka API 
but will require a bit more resources to maintain. The second approach would 
have my vote.

I don’t want to introduce scope creep, but I think there are more input sources 
that could benefit from a more generic (de)serialization scheme. Should we look 
into that, or leave it for now (in issues 5479 the idea of a `common connector 
framework` is mentioned, should it be picked up there)?

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460879#comment-16460879
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

[~tzulitai] I thought about that second approach as well but didn't want to 
mention it because it requires work by us when there are new Kafka features. 
I'd be fine with either approach, though.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460871#comment-16460871
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


+1 to the general idea; simplifying the schema methods and unifying the keyed / 
non-keyed versions seems very appealing.

However, I suggest not to expose Kafka's {{ConsumerRecord}} directly, otherwise 
our API compatibility will be dependent on Kafka changing this class.
This might be ok, since users should expect that API changes across major Kafka 
connector versions anyways.
This would also entail that we might eventually have version-specific 
serialization schema classes, instead of a base one.

Another approach, if we want to avoid this, is to have our own 
`ConsumerRecordMetaInfo` class that we pass via the serialization schema.
Downside would be that we have to make sure to expose everything in Kafka's 
`ConsumerRecord` to our own class.
It's a bit more manual, but should be manageable in a non-API breaking way 
(whenever we have to add more information to that class).

I more leaning towards the second approach.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460869#comment-16460869
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

Thanks for opening the PR and pushing the discussion! I think we have to take a 
step back, though, and reconsider the design of the deserialisation schemes. I 
don't think we can keep adding ever more schemes when new features come up. I 
think a simple fix for that is to introduce a deserialisation scheme that 
accepts a Kafka {{ConsumerRecord}}, all the information that you could ever 
want to access has to be in there, since it's the only thing we get from Kafka. 
This would also greatly simply the methods and would unify keyed/non-keyed 
schemes.

[~tzulitai] What do you think about this? 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-28 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457600#comment-16457600
 ] 

Fred Teunissen commented on FLINK-8500:
---

I've created a [pull request|https://github.com/apache/flink/pull/5939] for 
this issue. This is my first pull request, so I hope that I addressed all of 
the 'contribution code guidelines' correctly. Please let me know whether I 
should do something different or when I forgot something. 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457587#comment-16457587
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

GitHub user FredTing opened a pull request:

https://github.com/apache/flink/pull/5939

[FLINK-8500] [Kafka Connector] Get the timestamp of the Kafka message from 
kafka consumer

## What is the purpose of the change

This pull request make the Kafka timestamp and timestampType available in 
the message deserialisation so one can use it in the business logic processing.

## Brief change log
- Introduced new interface `KeyedWithTimestampDeserializationSchema` with 
extra parameters in the `deserialize` method.
- Added the `KeyedWithTimestampDeserializationSchemaWrapper` class to keep 
the code backwards compatible.
- Adjusted the Kafka Connectors 0.10+ to support the new interface too.
- Adjusted the Kafka Connectors 0.9- to 'hide' this new interface since 
these version of Kafka don';t support timestamps
- Added some documentation.  

## Verifying this change

This change is already covered by existing tests, such as most of the the 
Kafka Consumer tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented?  docs / JavaDocs 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/FredTing/flink FLINK-8500

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5939.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5939


commit 30293ac49a1d31c2abfa2b3fb3640e9e04ef8bcf
Author: Fred Teunissen 
Date:   2018-04-28T08:06:41Z

[FLINK-8500] Get the timestamp of the Kafka message from kafka 
consumer(Kafka010Fetcher)




> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451808#comment-16451808
 ] 

yanxiaobin commented on FLINK-8500:
---

hi [~aljoscha] :What is your idea? I now solve the problem by adjusting the 
source code, but it is very inconvenient for future upgrade. I hope it can be 
resolved in the new distribution as soon as possible so that I do not have to 
maintain another set of code myself. 

 

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-04-22 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447144#comment-16447144
 ] 

Fred Teunissen commented on FLINK-8500:
---

We want to use the kafka message timestamp in our business logic, so we would 
like to have this timestamp available during the deserialisation of the 
message. But we also need the TimestampType to be able to detect from whom the 
timestamp came (the producer or the kafka broker).

If we don't want to break the interface we can make a new interface 
{{KeyedAndTimestampDeserializationSchema}} extending the interface 
{{KeyedDeserializationSchema}} and add a 
{{KeyedAndTimestampDeserializationSchemaWrapper}} that accepts a 
{{KeyedDeserializationSchema}}. The FlinkKafkaConsumers constructors can be 
extended to accept the new {{KeyedAndTimestampDeserializationSchema}} as 
parameter and wrap the calls with the {{KeyedDeserializationSchema}} interface.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-03-08 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390914#comment-16390914
 ] 

yanxiaobin commented on FLINK-8500:
---

Sorry to reply so late. I see. We really should be careful here. I want to find 
out how I should get Kafka timestamp in like flatmap and map methods of 
datastream.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377145#comment-16377145
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

Yes, I still think that we need a solution for this, that's why I kept the 
issue open. However, adding a parameter to that interface will break all 
existing code, that's why we have to be careful here.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376428#comment-16376428
 ] 

yanxiaobin commented on FLINK-8500:
---

hi, [~aljoscha] ! I can solve that problem like that,but I still think the 
method deserialize of KeyedDeserializationSchema  needs a parameter 'kafka 
message timestamp' (from ConsumerRecord), It is possible that this timestamp 
will also be used in some business logic processing!What do you think?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-19 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369081#comment-16369081
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

I've moved this to 1.6 because we're already very late with the feature freeze 
for 1.5. There exists a workaround of writing a custom timestamp extractor that 
is not derived from {{AscendingTimestampExtractor}} or 
{{BoundedOutOfOrdernessTimestampExtractor}}, there you can implement the method 
{{extractTimestamp(T, long)}} which has access to the current timestamp, which 
is the Kafka timestamp in case of using the Kafka source.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357295#comment-16357295
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8500:


Moving this to 1.4.2, since on the mailing lists the community agreed to move 
forward with what we have already for 1.4.1.
Please reopen and let me know if you disagree.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-31 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347951#comment-16347951
 ] 

yanxiaobin commented on FLINK-8500:
---

Greate!I think it's reasonable.But I think it's best to optimize it. The user 
interface is as unified as possible! thanks , [~aljoscha] .

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347008#comment-16347008
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

Yes, no I feel stupid.  You analysed it exactly right: we do get the timestamp 
from Kafka but then all of the builtin timestamp extractors 
({{AscendingTimestampExtractor}} and 
{{BoundedOutOfOrdernessTimestampExtractor}}) don't take that into account and 
overwrite it.

You can get around that by writing a completely custom 
{{AssignerWithPeriodicWatermarks}} but it's not a good situation.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-31 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346550#comment-16346550
 ] 

yanxiaobin commented on FLINK-8500:
---

hi, [~aljoscha] .  What do you think?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346180#comment-16346180
 ] 

yanxiaobin commented on FLINK-8500:
---

hi,[~aljoscha] , thank you for your reply!Please look at the next picture!

!image-2018-01-31-10-48-59-633.png!

The final eventtime is obtained from  “{color:#80}final long 
{color}newTimestamp = extractAscendingTimestamp(element);“  , and   the element 
was deserialized from "KeyedDeserializationSchema" . Also the  parameter 
"elementPrevTimestamp" is not used!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-30 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344858#comment-16344858
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

Yes, the Kafka timestamp is already used as the event-time timestamp.  See 
here: 
https://github.com/apache/flink/blob/28e8043ba09b47c99439fdb536a4226eccf70c07/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java#L85

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-29 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344591#comment-16344591
 ] 

yanxiaobin commented on FLINK-8500:
---

As shown below(Kafka010Fetcher): !image-2018-01-30-14-58-58-167.png!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Attachments: image-2018-01-30-14-58-58-167.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-25 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338958#comment-16338958
 ] 

yanxiaobin commented on FLINK-8500:
---

I mean that I want to use Kafka message timestamp as event time!  Thanks!

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-01-24 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337598#comment-16337598
 ] 

Aljoscha Krettek commented on FLINK-8500:
-

The timestamp of Kafka messages is attached to elements. You can, for example, 
access that using {{Context.timestamp()}} when using a {{ProcessFunction}}.

Is that good enough for your purposes?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)