[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204922265
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+   }
+   return maxNumberOfRecordsPerFetch;
--- End diff --

the return value is never used


---


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-11 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/6300
  
@tzulitai can you please take a look - would be good if we can get this 
into v1.6.0


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-11 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r201792650
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -224,10 +232,19 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
+   long recordBatchSizeBytes = 0L;
+   long averageRecordSizeBytes = 0L;
+
for (UserRecord record : 
fetchedRecords) {
+   recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
+   if (useAdaptiveReads && 
fetchedRecords.size() != 0) {
--- End diff --

nit: && !isEmpty()


---


[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...

2018-07-11 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r201793459
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --

Make this protected to allow for override? (Currently the shard consumer as 
a whole cannot be customized, but I think it should.)


---


[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...

2018-06-18 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/6177
  
@tzulitai the commits in this PR (except the most recent version change) 
are from our 1.4.x fork and what we use in production. As discussed on the ML 
not even the Kinesis version number is strictly a patch compatible change and I 
don't see an issue including the other changes, too.


---


[GitHub] flink pull request #6177: Backport of Kinesis connector changes from 1.5 to ...

2018-06-17 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/6177

Backport of Kinesis connector changes from 1.5 to release-1.4

Backport of Kinesis connector changes from 1.5 release to 1.4.

R: @tzulitai @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink release-1.4-kinesis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6177.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6177


commit a231c3c2f6e2aa1148d002a517ee67226ea78af4
Author: Thomas Weise 
Date:   2018-01-31T01:44:44Z

[FLINK-8516] [kinesis] Allow for custom hash function for shard to subtask 
mapping in Kinesis consumer

This closes #5393.

commit 17d6b49583492f0f3b7fe9db5ffcd508fa324864
Author: Thomas Weise 
Date:   2018-02-14T00:33:59Z

[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState 
in Kinesis connector.

This closes #5480.

commit 6ef1e9e0ae124e7bb0e375867fb78ada9e6a0e74
Author: Kailash HD 
Date:   2018-03-08T18:32:23Z

[FLINK-] [Kinesis Connectors] Update the AWS SDK for flink kinesis 
connector

This closes #5663

commit 2ff1279bf63555b238670e66f3e0785b0214e00a
Author: Kailash HD 
Date:   2018-03-14T16:20:12Z

[FLINK-8945] [kinesis] Allow customization of KinesisProxy

This closes #5698

commit 2ba9306a0db24f6de49cf7b1d7e5b4b28eae
Author: Thomas Weise 
Date:   2018-04-03T03:49:50Z

[FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read 
timeout and retry.

This closes #5803.

commit a734e53f59cdad56c84f99d85aa2b05a2e395b78
Author: Moser Thomas W 
Date:   2018-04-27T18:27:03Z

[FLINK-9266] [kinesis] Updates Kinesis connector to use newer version of 
kcl to limit describe streams calls

This closes #5932.




---


[GitHub] flink issue #6058: [FLINK-9415] Remove reference to StreamingMultipleProgram...

2018-05-23 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/6058
  
@tzulitai PTAL


---


[GitHub] flink pull request #6058: [FLINK-9415] Remove reference to StreamingMultiple...

2018-05-22 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/6058

[FLINK-9415] Remove reference to StreamingMultipleProgramsTestBase in docs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink 
FLINK-9415.StreamingMultipleProgramsTestBase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6058.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6058


commit 68c8db890ba939a03ce324d905530ac057eff932
Author: Thomas Weise <thw@...>
Date:   2018-05-23T02:15:57Z

[FLINK-9415] Remove reference to StreamingMultipleProgramsTestBase in docs




---


[GitHub] flink pull request #6045: [FLINK-9402] [kinesis] Kinesis consumer configurat...

2018-05-19 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/6045

[FLINK-9402] [kinesis] Kinesis consumer configuration requires either 
region or endpoint.



*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-9402.regionOrEndpoint

Alternatively you 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481796
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Minor: this could be generalized by iterating over a map of oldkey -> 
newkey and I would also suggest to log a warning for deprecated keys


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378986
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

remove unnecessary change


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188379050
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

remove unnecessary change


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378324
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I think that this should be one unit of work.


---


[GitHub] flink issue #5992: [FLINK-8944] [Kinesis Connector] Use listShards instead o...

2018-05-15 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5992
  
Looks like we can either leave all the application facing constants as they 
are (which might look a bit strange given they refer to old API and we are now 
using the new), or we deprecate and duplicate :) If the latter, then I would 
suggest to map from old/deprecated to new property keys in a single place and 
issue deprecation warning.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-11 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187627524
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -66,13 +66,13 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
/** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

This would be a breaking change. We should leave these properties as is, if 
they are semantically equivalent.


---


[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...

2018-04-25 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5889#discussion_r184224413
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -86,4 +86,19 @@ protected AmazonKinesis createKinesisClient(Properties 
configProps) {
assertEquals(1, clientConfiguration.getSocketTimeout());
}
 
+   @Test
+   public void testClientConfigOverride() {
+
+   Properties configProps = new Properties();
+   configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
+   configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + 
"socketTimeout", "");
--- End diff --

The credentials construction code may be hard to replace declaratively. 
Also, the credentials are not a client config property, but a property of the 
Kinesis client.


---


[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...

2018-04-25 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5889#discussion_r184220854
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -186,7 +186,10 @@ protected KinesisProxy(Properties configProps) {
 * @return
 */
protected AmazonKinesis createKinesisClient(Properties configProps) {
-   return AWSUtil.createKinesisClient(configProps, new 
ClientConfigurationFactory().getConfig());
+
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
+   AWSUtil.setAwsClientConfigProperties(awsClientConfig, 
configProps);
+   return AWSUtil.createKinesisClient(configProps, 
awsClientConfig);
--- End diff --

Actually there does not seem to be any overlap. 
`AWSConfigConstants.AWS_REGION` and `AWSConfigConstants.AWS_ENDPOINT` are used 
to construct the client, not the client config. 


---


[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...

2018-04-25 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5889#discussion_r184218974
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -186,7 +186,10 @@ protected KinesisProxy(Properties configProps) {
 * @return
 */
protected AmazonKinesis createKinesisClient(Properties configProps) {
-   return AWSUtil.createKinesisClient(configProps, new 
ClientConfigurationFactory().getConfig());
+
+   ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
+   AWSUtil.setAwsClientConfigProperties(awsClientConfig, 
configProps);
+   return AWSUtil.createKinesisClient(configProps, 
awsClientConfig);
--- End diff --

That's correct, the generic property setting takes place before those keys 
are processed.


---


[GitHub] flink issue #5889: [FLINK-9188] [kinesis] Generic mechanism to set ClientCon...

2018-04-22 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5889
  
R: @tzulitai 


---


[GitHub] flink pull request #5889: [FLINK-9188] [kinesis] Generic mechanism to set Cl...

2018-04-22 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5889

[FLINK-9188] [kinesis] Generic mechanism to set ClientConfiguration 
properties.

## What is the purpose of the change

This pull request enables setting properties on the AWS ClientConfiguration 
from user supplied properties with a specific prefix.

## Brief change log
- use Jackson to set properties in a generic way so that we don't need to 
assume knowledge of the AWS properties in the Flink connector code.

## Verifying this change

This change added tests and can be verified as follows:

Added test to verify the configuration mechanism.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-9188.ConfigureKinesisClient

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5889.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5889


commit b65ffd5a3fd72478c7cfdaff17b4199febaa7650
Author: Thomas Weise <thw@...>
Date:   2018-04-17T05:01:52Z

[FLINK-9188] [kinesis] Generic mechanism to set ClientConfiguration 
properties.




---


[GitHub] flink pull request #5803: [FLINK-9124] [kinesis] Allow customization of Kine...

2018-04-11 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5803#discussion_r180967646
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -176,6 +179,16 @@ protected KinesisProxy(Properties configProps) {
 
}
 
+   /**
+* Create the Kinesis client, using the provided configuration 
properties and default {@link ClientConfiguration}.
+* Derived classes can override this method to customize the client 
configuration.
+* @param configProps
+* @return
+*/
+   protected AmazonKinesis createKinesisClient(Properties configProps) {
--- End diff --

Although it is theoretically possible to override the method and not look 
at `configProps`, it is  rather unlikely that this would be unintended. The 
user that ends up working at this level will probably be in need to control how 
the client config is initialized and the client
is constructed, to make the connector work. My vote is strongly in favor of 
not locking down things unless they are extremely well understood and there is 
a specific reason.

The connectors in general are fluent by nature and warrant a more flexible 
approach that 
empowers users to customize what they need without wholesale forking. By 
now we have run into several cases where behavior of the Kinesis connector had 
to be amended but private constructors or methods got into the way. Who would 
not prefer to spend time improving the connector functionality vs. opening 
JIRAs and PRs for access modification changes?

In our internal custom code we currently have an override that can 
generically set any simple property on the client config from the config 
properties. The approach comes with its own pros and cons and I think it should 
be discussed separately. If there is interest in having it in the Flink 
codebase as default behavior, I'm happy to take it up as a separate PR. I would 
still want to have the ability to override it though.


---


[GitHub] flink issue #5803: [FLINK-9124] [kinesis] Allow customization of KinesisProx...

2018-04-10 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5803
  
@aljoscha @tzulitai can you take a look?


---


[GitHub] flink pull request #5803: [FLINK-9124] [kinesis] Allow customization of Kine...

2018-04-02 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5803

[FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords read 
timeout and retry.

## What is the purpose of the change

This pull request enables overrides for the AWS ClientConfiguration and 
getRecords retry in KinesisProxy.

## Brief change log
- option to override retry for any SdkClientException
- option to customize the ClientConfiguration used to construct the Kinesis 
client

## Verifying this change

This change added tests and can be verified as follows:

Added test to verify the configuration override.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-9124.Kinesis.getRecords

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5803.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5803


commit 563763fa6eb6eede1654a6b157a1b006b360731d
Author: Thomas Weise <thw@...>
Date:   2018-04-03T03:49:50Z

[FLINK-9124] Allow customization of KinesisProxy.getRecords read timeout 
and retry.




---


[GitHub] flink issue #5698: [FLINK-8945] [kinesis] Allow customization of KinesisProx...

2018-03-16 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5698
  
@StephanEwen thanks for unblocking our work!


---


[GitHub] flink issue #5698: [FLINK-8945] [kinesis] Allow customization of KinesisProx...

2018-03-15 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5698
  
+1
@StephanEwen can we get this into 1.5.0 ?


---


[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...

2018-03-13 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5663
  
@StephanEwen we are looking to utilize this patch to override discovery 
using ListShards (initially in our connector extension). 

I have tested the SDK version change on top of our Flink 1.4 + patches 
branch with my kinesis quick check app and it works as expected.

I believe @kailashhd had already addressed all other concerns?





---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
This is a good proposal, it should also survive a general connector 
refactor that will be necessary to address other code duplication. The Kinesis 
ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a 
reference back to this thread. I would be happy to add the watermark support 
based on the revised generator. Perhaps it would be good to recognize the 
special "idle" watermark in SourceContext also?


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai @StephanEwen the current idleness detection in the source context 
isn't a replacement for what is required to deal with an inactive partition (or 
Kinesis shard). When a connector subtask consumes multiple partitions and one 
is idle, then it should be possible to still generate a watermark. This can 
only be solved outside of the connector when the multiple source partitions are 
visible (like it would be for an operator with multiple input streams).


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-06 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
There was a related discussion on the mailing list; this and several other 
features could be provided by a common connector framework. Such initiative is 
a much larger effort though and it is not clear to me that users can wait? The 
Kinesis consumer has virtually identical requirements and we have already 
written custom code for it.


---


[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...

2018-02-20 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
@tzulitai any update?


---


[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...

2018-02-15 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
@tzulitai see 
https://gist.github.com/tweise/7fad5d5df0abf54670a52d0d02d61179 for details.

As indicated in the email thread, emit override will track watermark state 
and delegate the record to the base implementation.


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-15 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai thanks for the review!


---


[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...

2018-02-13 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
R: @tzulitai 


---


[GitHub] flink pull request #5480: [FLINK-8648] [kinesis] Allow for customization of ...

2018-02-13 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5480

[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState 
in Kinesis connector.

## What is the purpose of the change

Allow customization of record emission in the Kinesis consumer. In this 
case we will use it to implement custom watermark logic but it could also be 
used for any other logic that inspects the record with contextual information 
about the shard.

## Brief change log

- Trivial change to remove final from the declaration to enable override.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

I didn't add a test, but perhaps we can add a NOOP example with override in 
the test package just to have the compile path covered?

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / *no*)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / *no*)
  - The serializers: (yes / *no* / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / *no* 
/ don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
  - The S3 file system connector: (yes / *no* / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / *no*)
  - If yes, how is the feature documented? (*not applicable* / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink 
FLINK-8648.emitRecordAndUpdateState

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5480


commit c9b6369796cfa03448a1da32313981c581304a4b
Author: Thomas Weise <thw@...>
Date:   2018-02-14T00:33:59Z

[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState 
in Kinesis connector.




---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-08 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai any update?


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai done adding test

(CI failure BackPressureStatsTrackerITCase.testBackPressuredProducer 
unrelated to this PR)


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-06 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai Assignment and restore are orthogonal; this PR doesn't change 
restore. I therefore don't see the need to add another migration test for it. 
There is also no change to how the index is computed by default, it is just 
that part of what happened earlier in `isThisSubtaskShouldSubscribeTo` is now 
in the assigner. Should I add a (trivial) unit test that asserts that 
`isThisSubtaskShouldSubscribeTo` applies modulus to assigner returned value 
that falls outside the subtask index range?


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-06 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r166523133
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

I only see references to the partitioner from the package 
org.apache.flink.streaming.connectors.kinesis and not from subpackages - which 
is what I pointed out before. Since you seem to suggest that isn't a 
convention, I will move the class (to me it is just an observation and not 
important).


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-05 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai please see changes and couple questions.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-03 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165834978
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

It would result in import from the parent package in KinesisDataFetcher. 
Looking at a few other classes it wasn't clear to me that this is the 
established pattern, so please confirm.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-03 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165834292
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams,
this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = 
checkNotNull(deserializationSchema);
+   this.shardAssigner = checkNotNull(shardAssigner);
--- End diff --

adding this in FlinkKinesisConsumer


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-02 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai What is the proposed migration test going to assert? The assigner 
does not influence how state is saved and restored. Even when the assigner 
returns invalid index, the modulus will ensure that the shard gets assigned.


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-01 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai the PR is ready for review now 


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-01 Thread tweise
GitHub user tweise reopened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer

## What is the purpose of the change

Allow the user to customize Kinesis shard to subtask assignment in the 
Kinesis consumer.

## Brief change log

Added pluggable shard assigner.

## Verifying this change

Added unit test. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

Javadoc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5393


commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369
Author: Thomas Weise <thw@...>
Date:   2018-01-31T01:44:44Z

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r164952695
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

createFn(...) that will allow the function to be created with access to 
runtime context (like the number of subtasks), and then change the fn signature 
to only take shard metadata as parameter. Subclasses can override createFn, 
instead of having the property.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
Github user tweise closed the pull request at:

https://github.com/apache/flink/pull/5393


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you