[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5200 ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160692827 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- Probably you are right, however I haven't thought this through, but if you had and you feel that's a good opportunity to make some larger refactor I'm fine with that :) Anyway, as we discussed offline, such refactor could be made in separate PR and here you could just implement `AbstractFetcher#doCommitInternalOffsetsToKafka` for the testing purposes. I don't mind if you choose one over the other approach :) ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160599433 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- The result could also very well be that we should use this opportunity to refactor the vague dependencies between fetcher / consumer thread / consumer base, and include in this PR. I would not be against that. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598939 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -89,30 +90,44 @@ @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} final AssignerWithPeriodicWatermarks periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); final AssignerWithPunctuatedWatermarks punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - DummyFlinkKafkaConsumer c1 = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), --- End diff -- Will do. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r16059 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -559,6 +565,7 @@ public void onException(Throwable cause) { //the fetchers 'snapshotCurrentState()' method return at least //the restored offsets this.kafkaFetcher = fetcher; + this.kafkaOffsetCommitter = createOffsetCommitter(); --- End diff -- This is a very valid argument. Will address this with a factory perhaps, as you suggested. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160598772 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- I agree that composition suits better here, or maybe even neither of both. However, the reality is that currently the offset committing logic is implemented tightly as part of the `AbstractFetcher`, sharing the same Kafka client for both fetching records and committing offsets. Decoupling that would require further refactoring, which I think is a bit out of scope for the current issue at hand. I have been thinking that we should simply have two separate service implementations for offset committing and record fetching. If that happens, then neither composition or inheritance is required; offset committing and record fetching simply lives as two separate services. What do you think? ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160107827 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -770,8 +777,8 @@ public final void notifyCheckpointComplete(long checkpointId) throws Exception { return; } - final AbstractFetcher fetcher = this.kafkaFetcher; - if (fetcher == null) { + final KafkaOffsetCommitter offsetCommitter = this.kafkaOffsetCommitter; --- End diff -- why do we need this `final` local variable? `kafkaOffsetCommitter` seems to be "write-once" variable. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160107329 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -89,30 +90,44 @@ @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false) + new DummyFlinkKafkaConsumer( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), + false) .assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} final AssignerWithPeriodicWatermarks periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); final AssignerWithPunctuatedWatermarks punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); - DummyFlinkKafkaConsumer c1 = - new DummyFlinkKafkaConsumer<>(mock(AbstractFetcher.class), mock(AbstractPartitionDiscoverer.class), false); + DummyFlinkKafkaConsumer c1 = new DummyFlinkKafkaConsumer<>( + mock(AbstractFetcher.class), + mock(AbstractPartitionDiscoverer.class), + mock(KafkaOffsetCommitter.class), --- End diff -- Please replace everywhere `mock(KafkaOffsetCommitter.class)` with `new DummyKafkaOffsetCommitter()` or `new NoOpKafkaOffsetCommitter()`. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160103116 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -887,4 +894,9 @@ OffsetCommitMode getOffsetCommitMode() { LinkedMap getPendingOffsetsToCommit() { return pendingOffsetsToCommit; } + + @VisibleForTesting + KafkaOffsetCommitter createOffsetCommitter() { --- End diff -- rename to `getOffsetCommitter()`? ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160105955 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -51,7 +50,7 @@ *the Flink data streams. * @param The type of topic/partition identifier used by Kafka in the specific version. */ -public abstract class AbstractFetcher{ +public abstract class AbstractFetcher implements KafkaOffsetCommitter { --- End diff -- Maybe use composition instead of inheritance here? `AbstractFetcher` as "being a" `KafkaOffsetCommitter` do not make sense to me when I say it out laud, while `AbstractFetcher` "having a" `KafkaOffsetCommitter` sounds better. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5200#discussion_r160112834 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -559,6 +565,7 @@ public void onException(Throwable cause) { //the fetchers 'snapshotCurrentState()' method return at least //the restored offsets this.kafkaFetcher = fetcher; + this.kafkaOffsetCommitter = createOffsetCommitter(); --- End diff -- Changing order of those lines: ``` this.kafkaFetcher = fetcher; this.kafkaOffsetCommitter = createOffsetCommitter(); ``` will brake the code. This is super fragile and is kind of hard to follow. ---
[GitHub] flink pull request #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitte...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5200 [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter interface ## What is the purpose of the change This PR is built upon the reworked `FlinkKafkaConsumerBaseTest` in #5188. Prior to this PR, offset committing was coupled tightly with the `AbstractFetcher`, making unit tests for offset committing behaviours hard to compose concisely. For example, we had tests that required mocking the offset commit methods on `AbstractFetcher`, while ideally, it would be best that those methods are made final (thus, unable to be mocked) to prevent accidental overrides. This PR decouples offset committing as a separate service behind a new `KafkaOffsetCommitter` interface. For now, the `AbstractFetcher` is reused as an implementation of this service, so that this PR does not introduce any more change other than introducing a new layer of abstraction. Unit tests that verify offset committing behaviour now provide a dummy verifiable implementation of the `KafkaOffsetCommitter` (instead of using mocks on `AbstractFetcher`) and test against that. ## Brief change log - Migrate `AbstractFetcher::commitInternalOffsetsToKafka` method to the newly introduced `KafkaOffsetCommitter` interface. - Let `AbstractFetcher` implement `KafkaOffsetCommitter` - In the `FlinkKafkaConsumerBase`, let "offset committing" and "record fetching" be logically separated to be handled by two services, i.e. namely a `KafkaOffsetCommitter` and a `AbstractFetcher`. Physically, the fetcher instance sits behind both service abstractions. - In `FlinkKafkaConsumerBaseTest`, remove all mocks on `AbstractFetcher::commitInternalOffsetsToKafka`, and test against a `KafkaOffsetCommitter` instead. - Additional hotfix 2906968 that fixes a stale comment referring to an old `AbstractFetcher` behaviour to avoid confusion. ## Verifying this change This PR does not add any new functionality. Reworked test also do not affect test coverage. `FlinkKafkaConsumerBaseTest` verifies all changes. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8306 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5200.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5200 commit cb3f488877b7fb2ab0dfcc5c24fe53035bc765e7 Author: Tzu-Li (Gordon) TaiDate: 2017-12-20T00:10:44Z [FLINK-8296] [kafka] Rework FlinkKafkaConsumerBaseTest to not rely on Java reflection Reflection was mainly used to inject mocks into private fields of the FlinkKafkaConsumerBase, without the need to fully execute all operator life cycle methods. This, however, caused the unit tests to be too implementation-specific. This commit reworks the FlinkKafkaConsumerBaseTest to remove test consumer instantiation methods that rely on reflection for dependency injection. All tests now instantiate dummy test consumers normally, and let all tests properly execute all operator life cycle methods regardless of the tested logic. commit 21491d567ef5d3b8294deec2890b48900c22dd56 Author: Nico Kruber Date: 2017-12-19T17:14:19Z [FLINK-8295] [cassandra] [build] Properly shade netty for the datastax driver com.datastax.driver.core.NettyUtil expects netty to be present either at its original package or relocated to com.datastax.shaded.netty. By relocating it to this package we make sure the driver follows its designated path. This closes #5183. commit ef40aaa9d942cc49c3d51816eacdaa5e7dbe9fa5 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T19:55:16Z [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter interface Prior to this commit, offset committing was coupled tightly with the AbstractFetcher, making unit tests for offset committing behaviours hard to compose concisely. For example, we had tests that mock the offset commit methods on AbstractFetcher, while ideally, it would be best that those methods are made final to prevent accidental overrides. This commit decouples