[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4915


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-11-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148441559
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

I'm leaning towards `*ITCase` here, since it is indeed an integration test 
with Kafka.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-11-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148227364
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

Did you add the `!newPartitionsInTransaction.isEmpty()` check in the end? I 
couldn't find it on first glance. 

Regarding tests, if you add the check, would your current test fail? If 
not, I think the behaviour isn't properly tested.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-11-01 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r148226920
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

I haven't looked into too many `ITCase`s but coding guidelines require unit 
tests to be subsecond execution speed:
>Please use unit tests to test isolated functionality, such as methods. 
Unit tests should execute in subseconds and should be preferred whenever 
possible. The name of unit test classes have to on *Test. Use integration tests 
to implement long-running tests.

http://flink.apache.org/contribute-code.html#coding-guidelines

Then again, I don't know how consistent the tests are.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-31 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147942486
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

Aren't all of the other `ITCase`s starting a Flink job? 

Regardless I named it `Test` originally because it is slightly more unit 
"testish" compared to `Kafka011ProducerExactlyOnceITCase`.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-31 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147941508
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

It is always being called on `FlinkKafkaProducer::flush()`, so that's 
pretty easy :)


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147754468
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Test.java
 ---
@@ -61,7 +61,7 @@
  * IT cases for the {@link FlinkKafkaProducer011}.
  */
 @SuppressWarnings("serial")
-public class FlinkKafkaProducer011Tests extends KafkaTestBase {
+public class FlinkKafkaProducer011Test extends KafkaTestBase {
--- End diff --

Shouldn't this be named `*ITCase` according to the coding conventions ?


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147754127
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

How are you testing now that `enqueueRequest` is called? 


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147689963
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean 
logFailuresOnly) {
 */
@Override
public void open(Configuration configuration) throws Exception {
-   if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
--- End diff --

ok


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147675169
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

I implemented it, however in that case it would be much harder to test 
reliably `enqueueRequest(addPartitionsToTransactionHandler());` since in vast 
majority of cases this would be a dead code :/


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147674418
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
+   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+   Object sender = getValue(kafkaProducer, "sender");
+   invoke(sender, "wakeup");
--- End diff --

Good catch. It shouldn't make a difference, but I will change it to better 
match original code.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147667457
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
+   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
+   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
+   Object sender = getValue(kafkaProducer, "sender");
+   invoke(sender, "wakeup");
--- End diff --

`sender.wakeup` is outside of the lock in the original code. Do you think 
it makes a difference?


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147656743
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 ---
@@ -226,13 +228,19 @@ public int getTransactionCoordinatorId() {
 
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
+   enqueueNewPartitions().await();
+   }
+
+   private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
-   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
-   invoke(transactionManager, "enqueueRequest", new 
Class[]{txnRequestHandler.getClass().getSuperclass()}, new 
Object[]{txnRequestHandler});
-   TransactionalRequestResult result = 
(TransactionalRequestResult) getValue(txnRequestHandler, 
txnRequestHandler.getClass().getSuperclass(), "result");
-   Object sender = getValue(kafkaProducer, "sender");
-   invoke(sender, "wakeup");
-   result.await();
+   synchronized (transactionManager) {
+   Object txnRequestHandler = invoke(transactionManager, 
"addPartitionsToTransactionHandler");
--- End diff --

Not related to your changes and can be done in a separate PR but I think we 
should add this check from the original sources to *"mitigate"* KAFKA-6119:
```
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
```


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147652950
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -563,7 +553,7 @@ public void close() throws Exception {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
}
try {
-   producersPool.close();
+   producersPool.ifPresent(pool -> pool.close());
--- End diff --

I know about this controversy/discussion. However I don't know in what 
universe nullable fields are better compared to `Optional` fields :|


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-30 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147652455
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean 
logFailuresOnly) {
 */
@Override
public void open(Configuration configuration) throws Exception {
-   if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
--- End diff --

`initializeState()` performs some clean up actions that depends on the 
semantic - cleaning up/closing lingering transactions, thus this check should 
happen earlier. Otherwise `EXACTLY_ONCE` cleaning up code would be executed 
unnecessarily.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-27 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147442314
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -563,7 +553,7 @@ public void close() throws Exception {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
}
try {
-   producersPool.close();
+   producersPool.ifPresent(pool -> pool.close());
--- End diff --

I am not adamant about it but using `Optional` in private fields is not 
without controversy: https://stackoverflow.com/a/26328555

Also, `ifPresent(pool -> pool.close()` only works because `close` does not 
declare any checked exceptions. If it did, the code would not compile.


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-27 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4915#discussion_r147436687
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean 
logFailuresOnly) {
 */
@Override
public void open(Configuration configuration) throws Exception {
-   if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
--- End diff --

What is the benefit of moving this into `initializeState()`?


---


[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...

2017-10-27 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4915

[FLINK-7838] Bunch of hotfixes and fix missing synchronization in 
FlinkKafkaProducer011

## What is the purpose of the change

Most important is the commit adding missing synchronization, that might 
been the cause for some deadlocks on travis. Others are just non critical 
hotfixes.

## Brief change log

Please check individual commit messages.

## Verifying this change

This change is already covered by existing Kafka 0.11 connector tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4915


commit 4e0492595f497a49c63b8ffddcc66e720e4e4433
Author: Piotr Nowojski 
Date:   2017-10-24T15:35:56Z

[hotfix][kafka] Bump Kafka 0.11 dependency

This might include some bugfixes

commit e38b3461bc97a175bf67f1072b2e8a2a891c1f1a
Author: Piotr Nowojski 
Date:   2017-10-24T15:57:05Z

[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer

commit 04127b9c44807f5379e07d801847d993c39e94b1
Author: Piotr Nowojski 
Date:   2017-10-26T08:02:15Z

[hotfix][kafka] Fix FlinkKafkaProducer011 logger

commit 8b47ac214c4022563be8128e84bc02d5de98819c
Author: Piotr Nowojski 
Date:   2017-10-27T13:11:24Z

[hotfix][kafka-tests] Fix test names so that they are not ignored by mvn 
build

commit a6c4c8bbdbfc5c238557e151fa8598e71a562411
Author: Piotr Nowojski 
Date:   2017-10-25T16:08:46Z

[hotfix][kafka] Move checkpointing enable checking to initializeState

initializeState is called before open and since both of those functions
relay on chosen semantic, that means checkpointing enable check should
happen in initializeState.

commit 055e5d125df895fd010e1171d1d39f37177518a2
Author: Piotr Nowojski 
Date:   2017-10-27T13:14:58Z

[hotfix][kafka] Remove unsued field in FlinkKafkaProducer011

commit 6cf55ed8977135af01099452962962199e253348
Author: Piotr Nowojski 
Date:   2017-10-27T13:47:26Z

[hotfix][kafka] Do not return producers to a pool in abort for non 
EXACTLY_ONCE modes

Previously on abort(...) producers were returned to the pool. This was 
minor bug,
probably without any negative side effect, however this patch fixes it
and adds additional sanity checks to guard against similar bugs
in the future.




---