[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204922265 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,68 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** +* Adjusts loop timing to match target frequency if specified. +* @param processingStartTimeNanos The start time of the run loop "work" +* @param processingEndTimeNanos The end time of the run loop "work" +* @return The System.nanoTime() after the sleep (if any) +* @throws InterruptedException +*/ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** +* Calculates how many records to read each time through the loop based on a target throughput +* and the measured frequenecy of the loop. +* @param runLoopTimeNanos The total time of one pass through the loop +* @param numRecords The number of records of the last read operation +* @param recordBatchSizeBytes The total batch size of the last read operation +*/ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { + if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) { + long averageRecordSizeBytes = recordBatchSizeBytes / numRecords; + // Adjust number of records to fetch from the shard depending on current average record size + // to optimize 2 Mb / sec read limits + double loopFrequencyHz = 10.0d / runLoopTimeNanos; + double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz; + maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); + // Ensure the value is not more than 1L + maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); + } + return maxNumberOfRecordsPerFetch; --- End diff -- the return value is never used ---
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/6300 @tzulitai can you please take a look - would be good if we can get this into v1.6.0 ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r201792650 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -224,10 +232,19 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + long recordBatchSizeBytes = 0L; + long averageRecordSizeBytes = 0L; + for (UserRecord record : fetchedRecords) { + recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } + if (useAdaptiveReads && fetchedRecords.size() != 0) { --- End diff -- nit: && !isEmpty() ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r201793459 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { --- End diff -- Make this protected to allow for override? (Currently the shard consumer as a whole cannot be customized, but I think it should.) ---
[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/6177 @tzulitai the commits in this PR (except the most recent version change) are from our 1.4.x fork and what we use in production. As discussed on the ML not even the Kinesis version number is strictly a patch compatible change and I don't see an issue including the other changes, too. ---
[GitHub] flink pull request #6177: Backport of Kinesis connector changes from 1.5 to ...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/6177 Backport of Kinesis connector changes from 1.5 to release-1.4 Backport of Kinesis connector changes from 1.5 release to 1.4. R: @tzulitai @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink release-1.4-kinesis Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6177.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6177 commit a231c3c2f6e2aa1148d002a517ee67226ea78af4 Author: Thomas Weise Date: 2018-01-31T01:44:44Z [FLINK-8516] [kinesis] Allow for custom hash function for shard to subtask mapping in Kinesis consumer This closes #5393. commit 17d6b49583492f0f3b7fe9db5ffcd508fa324864 Author: Thomas Weise Date: 2018-02-14T00:33:59Z [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector. This closes #5480. commit 6ef1e9e0ae124e7bb0e375867fb78ada9e6a0e74 Author: Kailash HD Date: 2018-03-08T18:32:23Z [FLINK-] [Kinesis Connectors] Update the AWS SDK for flink kinesis connector This closes #5663 commit 2ff1279bf63555b238670e66f3e0785b0214e00a Author: Kailash HD Date: 2018-03-14T16:20:12Z [FLINK-8945] [kinesis] Allow customization of KinesisProxy This closes #5698 commit 2ba9306a0db24f6de49cf7b1d7e5b4b28eae Author: Thomas Weise Date: 2018-04-03T03:49:50Z [FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read timeout and retry. This closes #5803. commit a734e53f59cdad56c84f99d85aa2b05a2e395b78 Author: Moser Thomas W Date: 2018-04-27T18:27:03Z [FLINK-9266] [kinesis] Updates Kinesis connector to use newer version of kcl to limit describe streams calls This closes #5932. ---
[GitHub] flink issue #6058: [FLINK-9415] Remove reference to StreamingMultipleProgram...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/6058 @tzulitai PTAL ---
[GitHub] flink pull request #6058: [FLINK-9415] Remove reference to StreamingMultiple...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/6058 [FLINK-9415] Remove reference to StreamingMultipleProgramsTestBase in docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-9415.StreamingMultipleProgramsTestBase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6058 commit 68c8db890ba939a03ce324d905530ac057eff932 Author: Thomas Weise <thw@...> Date: 2018-05-23T02:15:57Z [FLINK-9415] Remove reference to StreamingMultipleProgramsTestBase in docs ---
[GitHub] flink pull request #6045: [FLINK-9402] [kinesis] Kinesis consumer configurat...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/6045 [FLINK-9402] [kinesis] Kinesis consumer configuration requires either region or endpoint. *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-9402.regionOrEndpoint Alternatively you
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481796 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Minor: this could be generalized by iterating over a map of oldkey -> newkey and I would also suggest to log a warning for deprecated keys ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378986 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- remove unnecessary change ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188379050 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- remove unnecessary change ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378324 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I think that this should be one unit of work. ---
[GitHub] flink issue #5992: [FLINK-8944] [Kinesis Connector] Use listShards instead o...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5992 Looks like we can either leave all the application facing constants as they are (which might look a bit strange given they refer to old API and we are now using the new), or we deprecate and duplicate :) If the latter, then I would suggest to map from old/deprecated to new property keys in a single place and issue deprecation warning. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187627524 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- This would be a breaking change. We should leave these properties as is, if they are semantically equivalent. ---
[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5889#discussion_r184224413 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -86,4 +86,19 @@ protected AmazonKinesis createKinesisClient(Properties configProps) { assertEquals(1, clientConfiguration.getSocketTimeout()); } + @Test + public void testClientConfigOverride() { + + Properties configProps = new Properties(); + configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + "socketTimeout", ""); --- End diff -- The credentials construction code may be hard to replace declaratively. Also, the credentials are not a client config property, but a property of the Kinesis client. ---
[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5889#discussion_r184220854 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -186,7 +186,10 @@ protected KinesisProxy(Properties configProps) { * @return */ protected AmazonKinesis createKinesisClient(Properties configProps) { - return AWSUtil.createKinesisClient(configProps, new ClientConfigurationFactory().getConfig()); + + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); + AWSUtil.setAwsClientConfigProperties(awsClientConfig, configProps); + return AWSUtil.createKinesisClient(configProps, awsClientConfig); --- End diff -- Actually there does not seem to be any overlap. `AWSConfigConstants.AWS_REGION` and `AWSConfigConstants.AWS_ENDPOINT` are used to construct the client, not the client config. ---
[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5889#discussion_r184218974 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -186,7 +186,10 @@ protected KinesisProxy(Properties configProps) { * @return */ protected AmazonKinesis createKinesisClient(Properties configProps) { - return AWSUtil.createKinesisClient(configProps, new ClientConfigurationFactory().getConfig()); + + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); + AWSUtil.setAwsClientConfigProperties(awsClientConfig, configProps); + return AWSUtil.createKinesisClient(configProps, awsClientConfig); --- End diff -- That's correct, the generic property setting takes place before those keys are processed. ---
[GitHub] flink issue #5889: [FLINK-9188] [kinesis] Generic mechanism to set ClientCon...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5889 R: @tzulitai ---
[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5889 [FLINK-9188] [kinesis] Generic mechanism to set ClientConfiguration properties. ## What is the purpose of the change This pull request enables setting properties on the AWS ClientConfiguration from user supplied properties with a specific prefix. ## Brief change log - use Jackson to set properties in a generic way so that we don't need to assume knowledge of the AWS properties in the Flink connector code. ## Verifying this change This change added tests and can be verified as follows: Added test to verify the configuration mechanism. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-9188.ConfigureKinesisClient Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5889.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5889 commit b65ffd5a3fd72478c7cfdaff17b4199febaa7650 Author: Thomas Weise <thw@...> Date: 2018-04-17T05:01:52Z [FLINK-9188] [kinesis] Generic mechanism to set ClientConfiguration properties. ---
[GitHub] flink pull request #5803: [FLINK-9124] [kinesis] Allow customization of Kine...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5803#discussion_r180967646 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -176,6 +179,16 @@ protected KinesisProxy(Properties configProps) { } + /** +* Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}. +* Derived classes can override this method to customize the client configuration. +* @param configProps +* @return +*/ + protected AmazonKinesis createKinesisClient(Properties configProps) { --- End diff -- Although it is theoretically possible to override the method and not look at `configProps`, it is rather unlikely that this would be unintended. The user that ends up working at this level will probably be in need to control how the client config is initialized and the client is constructed, to make the connector work. My vote is strongly in favor of not locking down things unless they are extremely well understood and there is a specific reason. The connectors in general are fluent by nature and warrant a more flexible approach that empowers users to customize what they need without wholesale forking. By now we have run into several cases where behavior of the Kinesis connector had to be amended but private constructors or methods got into the way. Who would not prefer to spend time improving the connector functionality vs. opening JIRAs and PRs for access modification changes? In our internal custom code we currently have an override that can generically set any simple property on the client config from the config properties. The approach comes with its own pros and cons and I think it should be discussed separately. If there is interest in having it in the Flink codebase as default behavior, I'm happy to take it up as a separate PR. I would still want to have the ability to override it though. ---
[GitHub] flink issue #5803: [FLINK-9124] [kinesis] Allow customization of KinesisProx...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5803 @aljoscha @tzulitai can you take a look? ---
[GitHub] flink pull request #5803: [FLINK-9124] [kinesis] Allow customization of Kine...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5803 [FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read timeout and retry. ## What is the purpose of the change This pull request enables overrides for the AWS ClientConfiguration and getRecords retry in KinesisProxy. ## Brief change log - option to override retry for any SdkClientException - option to customize the ClientConfiguration used to construct the Kinesis client ## Verifying this change This change added tests and can be verified as follows: Added test to verify the configuration override. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-9124.Kinesis.getRecords Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5803.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5803 commit 563763fa6eb6eede1654a6b157a1b006b360731d Author: Thomas Weise <thw@...> Date: 2018-04-03T03:49:50Z [FLINK-9124] Allow customization of KinesisProxy.getRecords read timeout and retry. ---
[GitHub] flink issue #5698: [FLINK-8945] [kinesis] Allow customization of KinesisProx...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5698 @StephanEwen thanks for unblocking our work! ---
[GitHub] flink issue #5698: [FLINK-8945] [kinesis] Allow customization of KinesisProx...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5698 +1 @StephanEwen can we get this into 1.5.0 ? ---
[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5663 @StephanEwen we are looking to utilize this patch to override discovery using ListShards (initially in our connector extension). I have tested the SDK version change on top of our Flink 1.4 + patches branch with my kinesis quick check app and it works as expected. I believe @kailashhd had already addressed all other concerns? ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 This is a good proposal, it should also survive a general connector refactor that will be necessary to address other code duplication. The Kinesis ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a reference back to this thread. I would be happy to add the watermark support based on the revised generator. Perhaps it would be good to recognize the special "idle" watermark in SourceContext also? ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai @StephanEwen the current idleness detection in the source context isn't a replacement for what is required to deal with an inactive partition (or Kinesis shard). When a connector subtask consumes multiple partitions and one is idle, then it should be possible to still generate a watermark. This can only be solved outside of the connector when the multiple source partitions are visible (like it would be for an operator with multiple input streams). ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 There was a related discussion on the mailing list; this and several other features could be provided by a common connector framework. Such initiative is a much larger effort though and it is not clear to me that users can wait? The Kinesis consumer has virtually identical requirements and we have already written custom code for it. ---
[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5480 @tzulitai any update? ---
[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5480 @tzulitai see https://gist.github.com/tweise/7fad5d5df0abf54670a52d0d02d61179 for details. As indicated in the email thread, emit override will track watermark state and delegate the record to the base implementation. ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai thanks for the review! ---
[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5480 R: @tzulitai ---
[GitHub] flink pull request #5480: [FLINK-8648] [kinesis] Allow for customization of ...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5480 [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector. ## What is the purpose of the change Allow customization of record emission in the Kinesis consumer. In this case we will use it to implement custom watermark logic but it could also be used for any other logic that inspects the record with contextual information about the shard. ## Brief change log - Trivial change to remove final from the declaration to enable override. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. I didn't add a test, but perhaps we can add a NOOP example with override in the test package just to have the compile path covered? ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / *no*) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know) - The S3 file system connector: (yes / *no* / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / *no*) - If yes, how is the feature documented? (*not applicable* / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8648.emitRecordAndUpdateState Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5480 commit c9b6369796cfa03448a1da32313981c581304a4b Author: Thomas Weise <thw@...> Date: 2018-02-14T00:33:59Z [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector. ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai any update? ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai done adding test (CI failure BackPressureStatsTrackerITCase.testBackPressuredProducer unrelated to this PR) ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai Assignment and restore are orthogonal; this PR doesn't change restore. I therefore don't see the need to add another migration test for it. There is also no change to how the index is computed by default, it is just that part of what happened earlier in `isThisSubtaskShouldSubscribeTo` is now in the assigner. Should I add a (trivial) unit test that asserts that `isThisSubtaskShouldSubscribeTo` applies modulus to assigner returned value that falls outside the subtask index range? ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r166523133 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; --- End diff -- I only see references to the partitioner from the package org.apache.flink.streaming.connectors.kinesis and not from subpackages - which is what I pointed out before. Since you seem to suggest that isn't a convention, I will move the class (to me it is just an observation and not important). ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai please see changes and couple questions. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165834978 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; --- End diff -- It would result in import from the parent package in KinesisDataFetcher. Looking at a few other classes it wasn't clear to me that this is the established pattern, so please confirm. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165834292 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams, this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); this.deserializationSchema = checkNotNull(deserializationSchema); + this.shardAssigner = checkNotNull(shardAssigner); --- End diff -- adding this in FlinkKinesisConsumer ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai What is the proposed migration test going to assert? The assigner does not influence how state is saved and restored. Even when the assigner returns invalid index, the modulus will ensure that the shard gets assigned. ---
[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5393 @tzulitai the PR is ready for review now ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
GitHub user tweise reopened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ## What is the purpose of the change Allow the user to customize Kinesis shard to subtask assignment in the Kinesis consumer. ## Brief change log Added pluggable shard assigner. ## Verifying this change Added unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation Javadoc You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5393 commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369 Author: Thomas Weise <thw@...> Date: 2018-01-31T01:44:44Z [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r164952695 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -93,6 +93,12 @@ /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; + /** +* The function that determines which subtask a shard should be assigned to. +*/ + // TODO: instead of the property, use a factory method that would allow subclass to access source context? --- End diff -- createFn(...) that will allow the function to be created with access to runtime context (like the number of subtasks), and then change the fn signature to only take shard metadata as parameter. Subclasses can override createFn, instead of having the property. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise closed the pull request at: https://github.com/apache/flink/pull/5393 ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you