[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-11 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5200


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-10 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160692827
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

Probably you are right, however I haven't thought this through, but if you 
had and you feel that's a good opportunity to make some larger refactor I'm 
fine with that :)

Anyway, as we discussed offline, such refactor could be made in separate PR 
and here you could just implement 
`AbstractFetcher#doCommitInternalOffsetsToKafka` for the testing purposes. 

I don't mind if you choose one over the other approach :)


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160599433
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

The result could also very well be that we should use this opportunity to 
refactor the vague dependencies between fetcher / consumer thread / consumer 
base, and include in this PR. I would not be against that.


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160598939
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -89,30 +90,44 @@
@SuppressWarnings("unchecked")
public void testEitherWatermarkExtractor() {
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
final AssignerWithPeriodicWatermarks periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
final AssignerWithPunctuatedWatermarks 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 
-   DummyFlinkKafkaConsumer c1 =
-   new 
DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false);
+   DummyFlinkKafkaConsumer c1 = new 
DummyFlinkKafkaConsumer<>(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
--- End diff --

Will do.


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r16059
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -559,6 +565,7 @@ public void onException(Throwable cause) {
//the fetchers 'snapshotCurrentState()' method 
return at least
//the restored offsets
this.kafkaFetcher = fetcher;
+   this.kafkaOffsetCommitter = createOffsetCommitter();
--- End diff --

This is a very valid argument. Will address this with a factory perhaps, as 
you suggested.


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-09 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160598772
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

I agree that composition suits better here, or maybe even neither of both. 
However, the reality is that currently the offset committing logic is 
implemented tightly as part of the `AbstractFetcher`, sharing the same Kafka 
client for both fetching records and committing offsets. Decoupling that would 
require further refactoring, which I think is a bit out of scope for the 
current issue at hand.

I have been thinking that we should simply have two separate service 
implementations for offset committing and record fetching. If that happens, 
then neither composition or inheritance is required; offset committing and 
record fetching simply lives as two separate services.

What do you think?


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-08 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160107827
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -770,8 +777,8 @@ public final void notifyCheckpointComplete(long 
checkpointId) throws Exception {
return;
}
 
-   final AbstractFetcher fetcher = this.kafkaFetcher;
-   if (fetcher == null) {
+   final KafkaOffsetCommitter offsetCommitter = 
this.kafkaOffsetCommitter;
--- End diff --

why do we need this `final` local variable? `kafkaOffsetCommitter` seems to 
be "write-once" variable.


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-08 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160107329
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -89,30 +90,44 @@
@SuppressWarnings("unchecked")
public void testEitherWatermarkExtractor() {
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
try {
-   new 
DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false)
+   new DummyFlinkKafkaConsumer(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
+   false)

.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null);
fail();
} catch (NullPointerException ignored) {}
 
final AssignerWithPeriodicWatermarks periodicAssigner = 
mock(AssignerWithPeriodicWatermarks.class);
final AssignerWithPunctuatedWatermarks 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
 
-   DummyFlinkKafkaConsumer c1 =
-   new 
DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), 
mock(AbstractPartitionDiscoverer.class), false);
+   DummyFlinkKafkaConsumer c1 = new 
DummyFlinkKafkaConsumer<>(
+   mock(AbstractFetcher.class),
+   mock(AbstractPartitionDiscoverer.class),
+   mock(KafkaOffsetCommitter.class),
--- End diff --

Please replace everywhere `mock(KafkaOffsetCommitter.class)` with `new 
DummyKafkaOffsetCommitter()` or `new NoOpKafkaOffsetCommitter()`.
  


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-08 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160103116
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -887,4 +894,9 @@ OffsetCommitMode getOffsetCommitMode() {
LinkedMap getPendingOffsetsToCommit() {
return pendingOffsetsToCommit;
}
+
+   @VisibleForTesting
+   KafkaOffsetCommitter createOffsetCommitter() {
--- End diff --

rename to `getOffsetCommitter()`?


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-08 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160105955
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -51,7 +50,7 @@
  *the Flink data streams.
  * @param  The type of topic/partition identifier used by Kafka in 
the specific version.
  */
-public abstract class AbstractFetcher {
+public abstract class AbstractFetcher implements 
KafkaOffsetCommitter {
--- End diff --

Maybe use composition instead of inheritance here? `AbstractFetcher` as 
"being a" `KafkaOffsetCommitter` do not make sense to me when I say it out 
laud, while `AbstractFetcher` "having a"  `KafkaOffsetCommitter` sounds better.


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2018-01-08 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5200#discussion_r160112834
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -559,6 +565,7 @@ public void onException(Throwable cause) {
//the fetchers 'snapshotCurrentState()' method 
return at least
//the restored offsets
this.kafkaFetcher = fetcher;
+   this.kafkaOffsetCommitter = createOffsetCommitter();
--- End diff --

Changing order of those lines:
```
this.kafkaFetcher = fetcher;
this.kafkaOffsetCommitter = createOffsetCommitter();
```
will brake the code. This is super fragile and is kind of hard to follow. 


---


[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...

2017-12-21 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5200

[FLINK-8306] [kafka] Introduce KafkaOffsetCommitter interface

## What is the purpose of the change

This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188.

Prior to this PR, offset committing was coupled tightly with the 
`AbstractFetcher`, making unit tests for offset committing behaviours hard to 
compose concisely. For example, we had tests that required mocking the offset 
commit methods on `AbstractFetcher`, while ideally, it would be best that those 
methods are made final (thus, unable to be mocked) to prevent accidental 
overrides.

This PR decouples offset committing as a separate service behind a new 
`KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as 
an implementation of this service, so that this PR does not introduce any more 
change other than introducing a new layer of abstraction.

Unit tests that verify offset committing behaviour now provide a dummy 
verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks 
on `AbstractFetcher`) and test against that.

## Brief change log

- Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the 
newly introduced `KafkaOffsetCommitter` interface.
- Let `AbstractFetcher` implement `KafkaOffsetCommitter`
- In the `FlinkKafkaConsumerBase`, let "offset committing" and "record 
fetching" be logically separated to be handled by two services, i.e. namely a 
`KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher 
instance sits behind both service abstractions.
- In `FlinkKafkaConsumerBaseTest`, remove all mocks on 
`AbstractFetcher::commitInternalOffsetsToKafka`, and test against a 
`KafkaOffsetCommitter` instead.
- Additional hotfix 2906968 that fixes a stale comment referring to an old 
`AbstractFetcher` behaviour to avoid confusion.

## Verifying this change

This PR does not add any new functionality. Reworked test also do not 
affect test coverage.
`FlinkKafkaConsumerBaseTest` verifies all changes.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8306

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5200


commit cb3f488877b7fb2ab0dfcc5c24fe53035bc765e7
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-20T00:10:44Z

[FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java 
reflection

Reflection was mainly used to inject mocks into private fields of the
FlinkKafkaConsumerBase, without the need to fully execute all operator
life cycle methods. This, however, caused the unit tests to be too
implementation-specific.

This commit reworks the FlinkKafkaConsumerBaseTest to remove test
consumer instantiation methods that rely on reflection for dependency
injection. All tests now instantiate dummy test consumers normally, and
let all tests properly execute all operator life cycle methods
regardless of the tested logic.

commit 21491d567ef5d3b8294deec2890b48900c22dd56
Author: Nico Kruber 
Date:   2017-12-19T17:14:19Z

[FLINK-8295] [cassandra] [build] Properly shade netty for the datastax 
driver

com.datastax.driver.core.NettyUtil expects netty to be present either at its
original package or relocated to com.datastax.shaded.netty. By relocating it
to this package we make sure the driver follows its designated path.

This closes #5183.

commit ef40aaa9d942cc49c3d51816eacdaa5e7dbe9fa5
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T19:55:16Z

[FLINK-8306] [kafka] Introduce KafkaOffsetCommitter interface

Prior to this commit, offset committing was coupled tightly with the
AbstractFetcher, making unit tests for offset committing behaviours hard
to compose concisely. For example, we had tests that mock the offset
commit methods on AbstractFetcher, while ideally, it would be best that
those methods are made final to prevent accidental overrides.

This commit decouples