[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198316#comment-16198316
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4779
  
Merged alongside #4239 


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198317#comment-16198317
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4779


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197419#comment-16197419
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4239


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197420#comment-16197420
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Thanks :)


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16197306#comment-16197306
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
Merged!  

Could you please close this PR?


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16194344#comment-16194344
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4779

[FLINK-6988] Add additional tests coverage for Kafka 0.11 connector

This PR only adds additional tests coverage for Kafka 0.11 connector. 
Please check individual commit messages for change log.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink kafka011-pool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4779.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4779


commit f3e6ef67be4e54471c0143d849e7b905f6e036ae
Author: Aljoscha Krettek 
Date:   2017-09-25T14:16:34Z

[FLINK-6988] Make TwoPhaseCommitSinkFunction work with Context

commit 7bc6154ef8bdfe1181e404e1f7801f9fbd93543d
Author: Piotr Nowojski 
Date:   2017-09-06T14:42:59Z

[FLINK-6988] Add Kafka 0.11 connector maven module

commit 3ee004710bf3e875dacb4dd5ebbf0820f9453739
Author: Piotr Nowojski 
Date:   2017-07-12T13:14:13Z

[FLINK-6988][kafka] Implement our own KafkaProducer class with transactions 
recovery

commit 262eb5bfa998083eb28b81cfcb2ae1cc1188c11c
Author: Piotr Nowojski 
Date:   2017-06-23T07:14:28Z

[FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once 
semantic

commit 489ad56b9750acb1a76b3315cf7723cf0da012ef
Author: Aljoscha Krettek 
Date:   2017-09-28T12:53:24Z

[hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests

commit 62a5848ba5ff4f1cbfaa6d2671835cd16e726896
Author: Piotr Nowojski 
Date:   2017-08-24T11:16:14Z

[hotfix][streaming] Fix typo in parameter and unify naming with parent class

commit 32bfd6300e6fda5f05b495439a581b79abbdae07
Author: Piotr Nowojski 
Date:   2017-08-24T12:16:55Z

[FLINK-6988][kafka] Add test for failure before before checkpoint and 
scaling down

commit 3f8afba1fa49a3c69e3f9b36d7bea3c6316205dd
Author: Piotr Nowojski 
Date:   2017-08-25T07:47:12Z

[FLINK-6988][kafka] Add Kafka 0.11 tests for scaling down and up again




> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191080#comment-16191080
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
@aljoscha rebased on latest master and integrated your changes


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16190973#comment-16190973
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user ariskk commented on the issue:

https://github.com/apache/flink/pull/4239
  
We are really looking forward to this  


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1615#comment-1615
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Bugs in tests (those that you can see in fixup commits)


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155541#comment-16155541
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
What were the bugs that you fixed?


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16152633#comment-16152633
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
@aljoscha I have addressed you "high level" comments and fixed some bugs. 
Please latest 5 commits (one of them is a new dependency on another PR: 
https://github.com/apache/flink/pull/4631 )


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146771#comment-16146771
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4239
  
I did a first high-level review of the code. I think it's good so far!

Before we can merge this, however, we need a few more things around it:
 - A section in the Kafka doc about the new exactly-once mode, how it can 
be configured etc.
 - A big disclaimer (possibly in an "alert" box) about the interplay with 
the transaction timeout and what the caveats there are
 - A section in the Javadocs about the aforementioned caveats
 - A check that ensures that the transaction timeout is set to a reasonably 
high setting (say 1 hour) when exactly-once semantics are enabled. (With an 
override setting that allows the user to set a lower transaction time out if 
they want to.)

Also, this has interplay with #4616 but we can resolve that by merging them 
in any order and fixing up the later changes when merging.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138874#comment-16138874
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134838729
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138244#comment-16138244
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134726007
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138243#comment-16138243
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134725699
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138194#comment-16138194
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134711336
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138014#comment-16138014
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134676224
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137995#comment-16137995
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134673133
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136526#comment-16136526
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134421636
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136439#comment-16136439
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134396258
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136431#comment-16136431
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134394707
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136440#comment-16136440
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134395636
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136433#comment-16136433
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134398289
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136426#comment-16136426
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134395283
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136428#comment-16136428
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134398783
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136424#comment-16136424
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134394721
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136436#comment-16136436
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134394869
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136435#comment-16136435
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134398934
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136437#comment-16136437
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134394729
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136427#comment-16136427
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134399048
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case 
of node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * For happy path usage is exactly the same as {@link 
org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * 
+ * {@link FlinkKafkaProducer#initTransactions()}
+ * {@link FlinkKafkaProducer#beginTransaction()}
+ * {@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}
+ * {@link FlinkKafkaProducer#flush()}
+ * {@link FlinkKafkaProducer#commitTransaction()}
+ * 
+ *
+ * To actually implement two phase commit, it must be possible to 
always commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In 
case of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link 
FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * 
+ * {@link FlinkKafkaProducer#initTransactions()}
+ * {@link FlinkKafkaProducer#beginTransaction()}
+ * {@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}
+ * {@link FlinkKafkaProducer#flush()}
+ * {@link FlinkKafkaProducer#getProducerId()}
+ * {@link FlinkKafkaProducer#getEpoch()}
+ * node failure... restore producerId and epoch from state
+ * {@link FlinkKafkaProducer#resumeTransaction(long, short)}
+ * {@link FlinkKafkaProducer#commitTransaction()}
+ * 
+ *
+ * {@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces 
{@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be 
done, because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort 
all on going transactions.
+ *
+ * Second way this implementation 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136434#comment-16136434
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134396137
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136438#comment-16136438
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134396090
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136430#comment-16136430
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134395975
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136441#comment-16136441
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134396704
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136429#comment-16136429
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134399156
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case 
of node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * For happy path usage is exactly the same as {@link 
org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * 
+ * {@link FlinkKafkaProducer#initTransactions()}
+ * {@link FlinkKafkaProducer#beginTransaction()}
+ * {@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}
+ * {@link FlinkKafkaProducer#flush()}
+ * {@link FlinkKafkaProducer#commitTransaction()}
+ * 
+ *
+ * To actually implement two phase commit, it must be possible to 
always commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In 
case of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link 
FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * 
+ * {@link FlinkKafkaProducer#initTransactions()}
+ * {@link FlinkKafkaProducer#beginTransaction()}
+ * {@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}
+ * {@link FlinkKafkaProducer#flush()}
+ * {@link FlinkKafkaProducer#getProducerId()}
+ * {@link FlinkKafkaProducer#getEpoch()}
+ * node failure... restore producerId and epoch from state
+ * {@link FlinkKafkaProducer#resumeTransaction(long, short)}
+ * {@link FlinkKafkaProducer#commitTransaction()}
+ * 
+ *
+ * {@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces 
{@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be 
done, because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort 
all on going transactions.
+ *
+ * Second way this implementation 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136425#comment-16136425
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134396368
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136432#comment-16136432
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134399119
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import 
org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wrapper around KafkaProducer that allows to resume transactions in case 
of node failure, which allows to implement
+ * two phase commit algorithm for exactly-once semantic FlinkKafkaProducer.
+ *
+ * For happy path usage is exactly the same as {@link 
org.apache.kafka.clients.producer.KafkaProducer}. User is
+ * expected to call:
+ *
+ * 
+ * {@link FlinkKafkaProducer#initTransactions()}
+ * {@link FlinkKafkaProducer#beginTransaction()}
+ * {@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}
+ * {@link FlinkKafkaProducer#flush()}
+ * {@link FlinkKafkaProducer#commitTransaction()}
+ * 
+ *
+ * To actually implement two phase commit, it must be possible to 
always commit a transaction after pre-committing
+ * it (here, pre-commit is just a {@link FlinkKafkaProducer#flush()}). In 
case of some failure between
+ * {@link FlinkKafkaProducer#flush()} and {@link 
FlinkKafkaProducer#commitTransaction()} this class allows to resume
+ * interrupted transaction and commit if after a restart:
+ *
+ * 
+ * {@link FlinkKafkaProducer#initTransactions()}
+ * {@link FlinkKafkaProducer#beginTransaction()}
+ * {@link 
FlinkKafkaProducer#send(org.apache.kafka.clients.producer.ProducerRecord)}
+ * {@link FlinkKafkaProducer#flush()}
+ * {@link FlinkKafkaProducer#getProducerId()}
+ * {@link FlinkKafkaProducer#getEpoch()}
+ * node failure... restore producerId and epoch from state
+ * {@link FlinkKafkaProducer#resumeTransaction(long, short)}
+ * {@link FlinkKafkaProducer#commitTransaction()}
+ * 
+ *
+ * {@link FlinkKafkaProducer#resumeTransaction(long, short)} replaces 
{@link FlinkKafkaProducer#initTransactions()}
+ * as a way to obtain the producerId and epoch counters. It has to be 
done, because otherwise
+ * {@link FlinkKafkaProducer#initTransactions()} would automatically abort 
all on going transactions.
+ *
+ * Second way this implementation 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133557#comment-16133557
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134044031
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131913#comment-16131913
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Implemented fixed size pool of producers, please check last commit.

If we run out of producers in the pool, exception is being thrown aborting 
ongoing snapshot.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16122077#comment-16122077
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on the issue:

https://github.com/apache/flink/pull/4239
  
Yep, that makes sense.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16122007#comment-16122007
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
This solution (basically a pool with a fixed size of 2) would work, only if 
there would be at most one pending commit transaction. Which is not always true 
in Flink - there can be multiple triggered checkpoints pending completion.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121769#comment-16121769
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on the issue:

https://github.com/apache/flink/pull/4239
  
I guess you could store the transactional.id for _next_ transaction in 
committed state. That way the new task starts the new transaction with the name 
stored in state which automatically aborts the open transaction.  


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121233#comment-16121233
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Indeed it seems like you are right. `read_committed` doesn't play along 
with long `max.transaction.timeout.ms`. I'm not sure about Beam, but in Flink 
we can not use one single `transactional.id`, because our checkpoints are 
asynchronous - `notifyCheckpointComplete` (which triggers 
`KafkaProducer#commit`) can come long after `preCommit`. In that time we can 
not use the same `transactional.id` for new transactions. 

We can walk around this issue by implementing a pool of 
`transactional.id`s, which we can save on the state. This will allows on 
restoring state to not only `recoverAndCommit` all pending transactions, but to 
abort all other unknown "lingering" transactions


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120818#comment-16120818
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on the issue:

https://github.com/apache/flink/pull/4239
  
> Hmmm, are you sure about this thing? That would mean that Kafka doesn't 
support transactional parallel writes from two different process, which would 
be very strange. Could you point to a source of this information?

It does not prohibit parallel transactions. Just restricts what an EOS 
consumer, which reads only the committed messages can see.

See 'Reading Transactional Messages' section in JavaDoc for KafkaConsumer : 
https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L421
 : 

> In read_committed mode, the consumer will read only those transactional 
messages which have been successfully committed. It will continue to read 
non-transactional messages as before. There is no client-side buffering in 
read_committed mode. Instead, the end offset of a partition for a 
read_committed consumer would be the offset of the first message in the 
partition belonging to an open transaction. This offset is known as the 'Last 
Stable Offset'(LSO).

If there is an open transaction, the EOS consumers don't read past it.



> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120713#comment-16120713
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Writing records in state would be very costly. It is only a "last resort" 
solution.

> That would imply exactly-once consumers can not read past that 
transaction as long as it is open

Hmmm, are you sure about this thing? That would mean that Kafka doesn't 
support transactional parallel writes from two different process, which would 
be very strange. Could you point to a source of this information? 

Resuming transactions is not a part of `KafkaProducer`'s API, however 
Kafka's REST API allows to do that. However I'm aware that it wasn't an 
intention of the authors to do so. Kafka Streams do not need to do that, 
because they achieve exactly-once semantic by using persistent communication 
channels (Kafka topics), so they can easily restart each operator on it's own 
by replay/rewinding every input channel (Kafka topic). This comes with a cost, 
because it makes communication between operators extremely, since every message 
must goes to HDDs at some point. 


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120249#comment-16120249
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on the issue:

https://github.com/apache/flink/pull/4239
  
May be an extra shuffle to make small batches could help. Another option is 
to buffer all the records in state and write them all inside commit(). But not 
sure how costly it is to save all the records in checkpointed state. 

Another issue I see with using random txn id : if a worker looks 
unresponsive and work is moved to another worker, it is possible that the old 
worker still lingers around with open transaction. That would imply it the 
exactly-once consumers can not read past that transaction as long as it is open.

I didn't know it was possible to resume a transaction since it was not part 
of producer API. This PR uses an undocumented way to do it.. do you know if 
Kafka Streams also does something like that? May be the producer will support 
`resumeTransaction()` properly in future.



> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119556#comment-16119556
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
I think there is no way we can to handle it in any different way then to 
increase the timeout to some very large value. Or is it?


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119268#comment-16119268
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on the issue:

https://github.com/apache/flink/pull/4239
  
How does exactly-once sink handle large gap between `preCommit()` and 
`recoverAndCommit()` in case of a recovery? The server seems to abort a 
transaction after a timeout `max.transaction.timeout.ms`. 


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116171#comment-16116171
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4239
  
Now that the prerequisite PRs are merged, we can rebase this now :)


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16105015#comment-16105015
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4239
  
please add an entry to the `MODULES_CONNECTORS'  variable in the 
`tools/travis_mvn_watchdog` sh script.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094208#comment-16094208
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128428437
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   @Nullable
+   protected TXN currentTransaction;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   protected ListState pendingTransactionsState;
+
+   protected ListState 
pendingCommitTransactionsState;
+
+   // -- methods that should be implemented in child class to support 
two phase commit algorithm --
+
+   /**
+* Write value within a transaction.
+*/
+   protected abstract void invoke(TXN transaction, IN value) throws 
Exception;
+
+   /**
+* Method that starts a new transaction.
+*
+* @return newly created transaction.
+*/
+   protected abstract TXN beginTransaction() throws Exception;
+
+   /**
+* Pre commit previously created transaction. Pre commit must make all 
of the necessary steps to prepare the
+* transaction for a commit that might happen in the future. After this 
point the transaction might still be
+* aborted, but underlying implementation must ensure that commit calls 
on already pre committed transactions
+* will always succeed.
+*
+* Usually implementation involves flushing the data.
+*/
+   protected abstract void preCommit(TXN transaction) throws Exception;
+
+   /**
+* Commit a pre-committed transaction. If this method fail, Flink 
application will be
+* restarted and {@link 
TwoPhaseCommitSinkFunction#recoverAndCommit(Serializable)} will be called again 
for the
+* same transaction.
+*/
+   protected abstract void commit(TXN transaction);
+
+   /**
+* Invoked on recovered transactions after a failure. Must 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094207#comment-16094207
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128428361
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a 
parallel data stream from
+ * Apache Kafka 0.11.x. The consumer can run in multiple parallel 
instances, each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * The Flink Kafka Consumer participates in checkpointing and 
guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once".
+ * (Note: These guarantees naturally assume that Kafka itself does not 
loose any data.)
+ *
+ * Please note that Flink snapshots the offsets internally as part of 
its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of 
how far the Flink Kafka consumer
+ * has consumed a topic.
+ *
+ * Please refer to Kafka's documentation for the available 
configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs
+ *
+ * NOTE: The implementation currently accesses partition 
metadata when the consumer
+ * is constructed. That means that the client that submits the program 
needs to be able to
+ * reach the Kafka brokers or ZooKeeper.
--- End diff --

Yes. I have a separate PR which cleans that up for all Kafka versions.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093225#comment-16093225
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128271792
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic.EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
--- End diff --

Yes, according to all of the tests it those.

(b) version works by passing instance of `FlinkKafkaProducer011` as 
a`SinkFunction` in the `KafkaStreamSink extends StreamSink` class. 
Under the hood this `StreamSink` makes some checking if `SinkFunction` actually 
implements various versions of checkpointing interfaces and in that way it 
calls the appropriate methods on `FlinkKafkaProducer011`.


> Add Apache Kafka 0.11 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093191#comment-16093191
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128180978
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a 
parallel data stream from
+ * Apache Kafka 0.11.x. The consumer can run in multiple parallel 
instances, each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * The Flink Kafka Consumer participates in checkpointing and 
guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once".
+ * (Note: These guarantees naturally assume that Kafka itself does not 
loose any data.)
+ *
+ * Please note that Flink snapshots the offsets internally as part of 
its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of 
how far the Flink Kafka consumer
+ * has consumed a topic.
+ *
+ * Please refer to Kafka's documentation for the available 
configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs
+ *
+ * NOTE: The implementation currently accesses partition 
metadata when the consumer
+ * is constructed. That means that the client that submits the program 
needs to be able to
+ * reach the Kafka brokers or ZooKeeper.
--- End diff --

Is it also true for 0.10?


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093190#comment-16093190
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128201074
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   @Nullable
+   protected TXN currentTransaction;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   protected ListState pendingTransactionsState;
+
+   protected ListState 
pendingCommitTransactionsState;
+
+   // -- methods that should be implemented in child class to support 
two phase commit algorithm --
+
+   /**
+* Write value within a transaction.
+*/
+   protected abstract void invoke(TXN transaction, IN value) throws 
Exception;
+
+   /**
+* Method that starts a new transaction.
+*
+* @return newly created transaction.
+*/
+   protected abstract TXN beginTransaction() throws Exception;
+
+   /**
+* Pre commit previously created transaction. Pre commit must make all 
of the necessary steps to prepare the
+* transaction for a commit that might happen in the future. After this 
point the transaction might still be
+* aborted, but underlying implementation must ensure that commit calls 
on already pre committed transactions
+* will always succeed.
+*
+* Usually implementation involves flushing the data.
+*/
+   protected abstract void preCommit(TXN transaction) throws Exception;
+
+   /**
+* Commit a pre-committed transaction. If this method fail, Flink 
application will be
+* restarted and {@link 
TwoPhaseCommitSinkFunction#recoverAndCommit(Serializable)} will be called again 
for the
+* same transaction.
+*/
+   protected abstract void commit(TXN transaction);
+
+   /**
+* Invoked on recovered transactions after a failure. Must 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092731#comment-16092731
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4239
  
Ok :) I can agree that we keep 321a142 a separate commit.
For df6d5e0 to 5ff8106, I actually found it easier to ignore all that 
(because a lot of it is irrelevant in the end) and went straight to 41ad973.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092728#comment-16092728
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
`df6d5e0` to `5ff8106` should definitely be squashed, I left them only to 
make it easier for reviewers to follow the changes made in 0.11 vs 0.10 
connectors (those changes would be invisible in one blob commit).

For `321a142` to `2cf5f3b` I'm not sure about the first one, 
`FlinkKafkaProducer` is that hacky that it could deserve separate commit. It 
would make it stand out more if anyone in the future would look at the commit 
history/changes (it could hide in larger change).


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092707#comment-16092707
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4239
  
One other comment regarding the commits:
I would argue that df6d5e0 to 5ff8106 should not appear in the commit log 
history, since in the end we actually have a completely new producer for 011 
anyways.
Also, 321a142 to 2cf5f3b should be squashed to a single commit for the 
addition of an "exactly-once producer for 011" (the new `FlinkKafkaProducer` 
implementation and exactly-once tests shouldn't stand alone as independent 
commits, IMO. `FlinkKafkaProducer` isn't used by other producer version, and 
the exactly-once producer addition wouldn't be valid without the tests).


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092703#comment-16092703
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4239
  
Regarding how I would proceed with this big contribution:
Lets first try to clean up the commits that are bundled all together here.

1. I would first try to merge #4321 (the first 4 commits) and #4310 
(af7ed19) and get those out of the way.
2. For a06cb94 (`TwoPhaseCommitSinkFunction`), could you open a separate PR 
with unit tests covered?
3. After the above is all sorted out, we rebase this again.





> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092701#comment-16092701
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4239
  
Thanks a lot for opening a pull request for this very important feature 
@pnowojski.
I did a rough first pass and had some comments I would like to clear out 
first (this is a big chunk of code, we would probably need to go through this 
quite a few times before it can be mergeable.)

Most notably, some comments so far:
1. I think we need UTs for the `TwoPhaseCommitSinkFunction`. It alone is a 
very important addition (I would even prefer a separate PR for it and try to 
merge that first.)
2. Serialization of the transaction state in `TwoPhaseCommitSinkFunction` 
needs to be changed
2. Is the `FlinkKafkaProducer011` actually supporting hybrid (normal sink 
function and `writeToKafkaWithTimestamps` as a custom sink operator)? From the 
looks of it, it doesn't seem like it.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092691#comment-16092691
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128163374
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * The Flink Kafka Consumer is a streaming data source that pulls a 
parallel data stream from
+ * Apache Kafka 0.11.x. The consumer can run in multiple parallel 
instances, each of which will pull
+ * data from one or more Kafka partitions.
+ *
+ * The Flink Kafka Consumer participates in checkpointing and 
guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly 
once".
+ * (Note: These guarantees naturally assume that Kafka itself does not 
loose any data.)
+ *
+ * Please note that Flink snapshots the offsets internally as part of 
its distributed checkpoints. The offsets
+ * committed to Kafka / ZooKeeper are only to bring the outside view of 
progress in sync with Flink's view
+ * of the progress. That way, monitoring and other jobs can get a view of 
how far the Flink Kafka consumer
+ * has consumed a topic.
+ *
+ * Please refer to Kafka's documentation for the available 
configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs
+ *
+ * NOTE: The implementation currently accesses partition 
metadata when the consumer
+ * is constructed. That means that the client that submits the program 
needs to be able to
+ * reach the Kafka brokers or ZooKeeper.
--- End diff --

This NOTE is no longer valid and can be removed.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092694#comment-16092694
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128163984
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic.EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *  Pre Kafka 0.11 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
--- End diff --

"Pre Kafka 0.11 producers only follow approach (a)" is incorrect.

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092690#comment-16092690
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128164127
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic.EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *  Pre Kafka 0.11 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092693#comment-16092693
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128164632
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
--- End diff --

I really like the idea of introducing this abstraction :)


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092697#comment-16092697
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128166188
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   @Nullable
+   protected TXN currentTransaction;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   protected ListState pendingTransactionsState;
+
+   protected ListState 
pendingCommitTransactionsState;
+
+   // -- methods that should be implemented in child class to support 
two phase commit algorithm --
+
+   /**
+* Write value within a transaction.
+*/
+   protected abstract void invoke(TXN transaction, IN value) throws 
Exception;
+
+   /**
+* Method that starts a new transaction.
+*
+* @return newly created transaction.
+*/
+   protected abstract TXN beginTransaction() throws Exception;
+
+   /**
+* Pre commit previously created transaction. Pre commit must make all 
of the necessary steps to prepare the
+* transaction for a commit that might happen in the future. After this 
point the transaction might still be
+* aborted, but underlying implementation must ensure that commit calls 
on already pre committed transactions
+* will always succeed.
+*
+* Usually implementation involves flushing the data.
+*/
+   protected abstract void preCommit(TXN transaction) throws Exception;
+
+   /**
+* Commit a pre-committed transaction. If this method fail, Flink 
application will be
+* restarted and {@link 
TwoPhaseCommitSinkFunction#recoverAndCommit(Serializable)} will be called again 
for the
+* same transaction.
+*/
+   protected abstract void commit(TXN transaction);
+
+   /**
+* Invoked on recovered transactions after a failure. Must 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092699#comment-16092699
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128166822
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
--- End diff --

Overall, though, I would like to see unit tests specifically for this 
`TwoPhaseCommitSinkFunction` class.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092696#comment-16092696
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128167617
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic.EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
--- End diff --

Does this implementation really support the hybrid modes?
As far as I can understand it, `FlinkKafkaProducer011` only extends 
`TwoPhaseCommitSinkFunction`, which doesn't support the hybrid modes.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092695#comment-16092695
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128166034
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   @Nullable
+   protected TXN currentTransaction;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   protected ListState pendingTransactionsState;
+
+   protected ListState 
pendingCommitTransactionsState;
+
+   // -- methods that should be implemented in child class to support 
two phase commit algorithm --
+
+   /**
+* Write value within a transaction.
+*/
+   protected abstract void invoke(TXN transaction, IN value) throws 
Exception;
+
+   /**
+* Method that starts a new transaction.
+*
+* @return newly created transaction.
+*/
+   protected abstract TXN beginTransaction() throws Exception;
+
+   /**
+* Pre commit previously created transaction. Pre commit must make all 
of the necessary steps to prepare the
+* transaction for a commit that might happen in the future. After this 
point the transaction might still be
+* aborted, but underlying implementation must ensure that commit calls 
on already pre committed transactions
+* will always succeed.
+*
+* Usually implementation involves flushing the data.
+*/
+   protected abstract void preCommit(TXN transaction) throws Exception;
+
+   /**
+* Commit a pre-committed transaction. If this method fail, Flink 
application will be
+* restarted and {@link 
TwoPhaseCommitSinkFunction#recoverAndCommit(Serializable)} will be called again 
for the
+* same transaction.
+*/
+   protected abstract void commit(TXN transaction);
+
+   /**
+* Invoked on recovered transactions after a failure. Must 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092692#comment-16092692
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128168357
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This is a recommended base class for all of the {@link SinkFunction} 
that intend to implement exactly-once semantic.
+ * It does that by implementing two phase commit algorithm on top of the 
{@link CheckpointedFunction} and
+ * {@link CheckpointListener}. User should provide custom TXN (transaction 
handle) and implement abstract methods
+ * handling this transaction handle.
+ *
+ * @param  Input type for {@link SinkFunction}
+ * @param  Transaction to store all of the information required to 
handle a transaction (must be Serializable)
+ */
+@PublicEvolving
+public abstract class TwoPhaseCommitSinkFunction
+   extends RichSinkFunction
+   implements CheckpointedFunction, CheckpointListener {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
+
+   @Nullable
+   protected TXN currentTransaction;
+
+   protected final List 
pendingCommitTransactions = new ArrayList<>();
+
+   protected ListState pendingTransactionsState;
+
+   protected ListState 
pendingCommitTransactionsState;
+
+   // -- methods that should be implemented in child class to support 
two phase commit algorithm --
+
+   /**
+* Write value within a transaction.
+*/
+   protected abstract void invoke(TXN transaction, IN value) throws 
Exception;
+
+   /**
+* Method that starts a new transaction.
+*
+* @return newly created transaction.
+*/
+   protected abstract TXN beginTransaction() throws Exception;
+
+   /**
+* Pre commit previously created transaction. Pre commit must make all 
of the necessary steps to prepare the
+* transaction for a commit that might happen in the future. After this 
point the transaction might still be
+* aborted, but underlying implementation must ensure that commit calls 
on already pre committed transactions
+* will always succeed.
+*
+* Usually implementation involves flushing the data.
+*/
+   protected abstract void preCommit(TXN transaction) throws Exception;
+
+   /**
+* Commit a pre-committed transaction. If this method fail, Flink 
application will be
+* restarted and {@link 
TwoPhaseCommitSinkFunction#recoverAndCommit(Serializable)} will be called again 
for the
+* same transaction.
+*/
+   protected abstract void commit(TXN transaction);
+
+   /**
+* Invoked on recovered transactions after a failure. Must 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092698#comment-16092698
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r128164162
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,818 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic.EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction 
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *  Pre Kafka 0.11 producers only follow approach (a), allowing users to 
use the producer using the
+ *  DataStream.addSink() method.
+ *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-07-04 Thread Luke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16074221#comment-16074221
 ] 

Luke commented on FLINK-6988:
-

Is there plan for flink producer connector to support multiple topics in one 
sink?
Since now kafka support transactional behavior, it can be very useful in use 
case where records send to different topics can be committed or rollback in 
same transaction state.

> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070319#comment-16070319
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4239

[FLINK-6988] Initial flink-connector-kafka-0.11 with at-least-once semantic

Couple of first commits are from other PRs #4206 #4209 #4213 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink kafka011

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4239


commit 5191d5b4b78620cfc5ecfc9088afba0d611eaacb
Author: Piotr Nowojski 
Date:   2017-06-26T09:28:51Z

[FLINK-6996] Refactor and automaticall inherit KafkaProducer integration 
tests

commit 1c7d349ce425ec0213059e062f10c90773cc780d
Author: Piotr Nowojski 
Date:   2017-06-26T10:20:36Z

[FLINK-6996] Fix formatting in KafkaConsumerTestBase and 
KafkaProducerTestBase

commit 5b849f98191439e69ca2357a4767f47957ee0250
Author: Piotr Nowojski 
Date:   2017-06-23T11:41:55Z

[FLINK-7030] Build with scala-2.11 by default

commit 3f62aecb57cea9d43611ecfa24e2233a63197341
Author: Piotr Nowojski 
Date:   2017-06-26T10:36:40Z

[FLINK-6996] Fix at-least-once semantic for FlinkKafkaProducer010

Add tests coverage for Kafka 0.10 and 0.9

commit 4b78626df474a8d49a406714a7142ad44d8a8faf
Author: Piotr Nowojski 
Date:   2017-06-28T18:30:08Z

[FLINK-7032] Overwrite inherited values of compiler version from parent pom

Default values were 1.6 and were causing Intellij to constantly switch 
language
level to 1.6, which in turn was causing compilation errors. It worked fine
for compiling from console using  maven, because those values are separetly 
set
in maven-compiler-plugin configuration.

commit 2c2556e72dd73c5e470e5afd6dab4a11cb41772d
Author: Piotr Nowojski 
Date:   2017-06-23T07:14:28Z

[FLINK-6988] Initial flink-connector-kafka-0.11 with at-least-once semantic

Code of 0.11 connector is based on 0.10 version




> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-06-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059774#comment-16059774
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6988:


We did have an offline discussion with [~rmetzger] for dropping 08 support. 
Having the need to maintain common features across all Kafka versions is 
certainly starting to feel a bit awkward for Kafka 08 compared to 09+, due to 
missing features in the old consumer / producer APIs.
I think we perhaps need to do a poll on the mailing list to get a feel of how 
many of our users are still using Kafka 08.

> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-06-22 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059735#comment-16059735
 ] 

Chesnay Schepler commented on FLINK-6988:
-

This would increase our number of supported kafka version to 4. What's the plan 
for removing kafka 0.8?

> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-06-22 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16059699#comment-16059699
 ] 

Piotr Nowojski commented on FLINK-6988:
---

Unfortunately KafkaProducer's API is very limited. Especially it doesn't allow 
to implement two phase commit protocol like it is done in BucketingSink, 
because it doesn't allow for neither resuming nor committing transactions from 
different workers after crash (last bullet point above). This is because every 
time user calls `Producer::initTransactions()`, all pending (not committed) 
transactions are being automatically aborted by Kafka Server. Calling 
`Producer::initTransactions()` is neccessary to obtain `producerId` and `epoch` 
values from the Kafka server, which are crucial for manipulating transactions.

Fortunately there is a walk around this issue. It seems like Kafka's REST API 
is more flexible and we should be possible to resume transactions. Every time 
we begin transaction we can store `producerId` and `epoch` on the state. In 
case we need to commit pending transaction on another worker (after crash), 
instead of calling `KafkaProducer::initTransactions()` we can restore 
`producerId` and `epoch` from the state and commit this pending transaction 
using those restored values.

"Hacky" part is that  `producerId` and `epoch` values are hidden behind private 
fields in package private classes. That means we can not overload 
`KafkaProducer` to obtain or set them. That leaves as with two options. We 
either reimplement KafkaProducer using Kafka's REST API (we could copy/paste 
most of their code) or we use JVM reflection to manually manipulate official 
KafkaProducer class.

> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to output topic using that transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)