[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5393


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-06 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r166523133
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

I only see references to the partitioner from the package 
org.apache.flink.streaming.connectors.kinesis and not from subpackages - which 
is what I pointed out before. Since you seem to suggest that isn't a 
convention, I will move the class (to me it is just an observation and not 
important).


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-06 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r166254162
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

@tweise 
I think it is fine. For example, we have a `KinesisPartitioner` interface 
as the public API, and it lives under the same package as 
`FlinkKinesisConsumer`.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-03 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165834978
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

It would result in import from the parent package in KinesisDataFetcher. 
Looking at a few other classes it wasn't clear to me that this is the 
established pattern, so please confirm.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-03 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165834292
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams,
this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = 
checkNotNull(deserializationSchema);
+   this.shardAssigner = checkNotNull(shardAssigner);
--- End diff --

adding this in FlinkKinesisConsumer


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590401
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
--- End diff --

The overview class Javadoc could probably be less generic.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590209
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
--- End diff --

I feel like this section of the Javadoc should be part of the Javadoc for 
the original consumer constructors, and should guide them to use the 
`setShardAssigner` method if the do encounter the case of serious shard skew.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165587359
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -182,13 +191,15 @@ public KinesisDataFetcher(List streams,

SourceFunction.SourceContext sourceContext,
RuntimeContext 
runtimeContext,
Properties configProps,
-   
KinesisDeserializationSchema deserializationSchema) {
+   
KinesisDeserializationSchema deserializationSchema,
+   KinesisShardAssigner 
kinesisShardToSubTaskIndexFn) {
--- End diff --

mismatching variable name: `kinesisShardToSubTaskIndexFn` --> 
`kinesisShardAssigner` or just `shardAssigner`


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588489
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
--- End diff --

mentions Kafka partition, should be Kinesis shard


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165589748
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
--- End diff --

of a single "**Kinesis stream**", not topic (which is Kafka terms)


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165591410
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, 
KinesisDeserializationSchema `shardAssigner`


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165587909
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
+*
+* It is recommended to monitor the shard distribution and adjust 
assignment appropriately.
+* Custom implementation may optimize the hash function or use static 
overrides to limit skew.
+ *
+ * @param shard the shard to determine
+ * @param numParallelSubtasks total number of subtasks
+ * @return index or hash code
--- End diff --

nit:
I think it would be more straightforward if we just say this returns the 
"target index".
If the index value is outside the subtask range, we perform a modulus 
operation.

So basically what the PR is already doing, just re-terming it to be more 
straightforward.
What do you think? This is also just a personal preference :)


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588295
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
--- End diff --

Add `@PublicEvolving` annotation, to make it clear this is a public API.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588395
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

I think it makes sense to move this class to the same level as 
`FlinkKinesisConsumer`, since it is part of the public API.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165589453
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
--- End diff --

The resulting distribution of shards "**should**" have the following 
contract.

i.e., we can't guarantee it, instead the user implementation should 
guarantee it.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165586434
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -76,6 +77,9 @@
 @Internal
 public class KinesisDataFetcher {
 
+   public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = 
(shard, subtasks) -> shard.hashCode();
+
+
--- End diff --

nit: unnecessary empty line


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165586994
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams,
this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = 
checkNotNull(deserializationSchema);
+   this.shardAssigner = checkNotNull(shardAssigner);
--- End diff --

We also need to try cleaning the closure of the given object (if it is a 
non-static inner class):
```
ClosureCleaner.clean(shardAssigner, true);
```


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588081
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
--- End diff --

nit: add one empty line above comment block (just a personal preference, 
though)


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165594279
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, 
KinesisDeserializationSchema

[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-01 Thread tweise
GitHub user tweise reopened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer

## What is the purpose of the change

Allow the user to customize Kinesis shard to subtask assignment in the 
Kinesis consumer.

## Brief change log

Added pluggable shard assigner.

## Verifying this change

Added unit test. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

Javadoc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5393


commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369
Author: Thomas Weise 
Date:   2018-01-31T01:44:44Z

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-31 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165083343
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*
+* @param shard the shard to determine
+* @param totalNumberOfSubtasks total number of subtasks
+* @return index or hash code
+*/
+   // TODO: extra parameter can be eliminated by creating hash 
function after runtime context is present
+   int getSubTaskIndex(StreamShardHandle shard, int 
totalNumberOfSubtasks);
--- End diff --

I actually prefer passing the `totalNumberOfSubtasks` value independently, 
instead of passing in `runtimeContext`. IMO, it provides more context on the 
nature of the assignment.

Moreover, IMO, having a factory as the API is much more complicating for 
the user.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-31 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165081311
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
--- End diff --

This also needs to be `Serializable`, since it'll be shipped along with the 
`JobGraph` to the JM.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-31 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165084584
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

Please see my comments below. 


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-31 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165084048
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
--- End diff --

This needs a much more stronger Javadoc explaining the contract of 
deterministic assignments.
See `KafkaTopicPartitionAssigner` for an example.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-31 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165081173
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -584,17 +594,34 @@ private static ShardMetricsReporter 
registerShardMetrics(MetricGroup metricGroup
//  Miscellaneous utility functions
// 

 
+   /**
+* Function to map a Kinesis shard to a Flink subtask index.
+*/
+   public interface ShardToSubtaskIndexFn {
--- End diff --

I would prefer if this is a top-level class, instead of nested in 
`KinesisDataFetcher` (which is actually an internal class).

Also, the `Fn` name could also be polished a bit. Maybe something like 
`KinesisShardAssigner` would be better? That would also be coherent name-wise 
with the Kafka side, which has a `KafkaTopicPartitionAssigner` class (though it 
isn't exposed).


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r164952695
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -93,6 +93,12 @@
/** User supplied deserialization schema to convert Kinesis byte 
messages to Flink objects. */
private final KinesisDeserializationSchema deserializer;
 
+   /**
+* The function that determines which subtask a shard should be 
assigned to.
+*/
+   // TODO: instead of the property, use a factory method that would allow 
subclass to access source context?
--- End diff --

createFn(...) that will allow the function to be created with access to 
runtime context (like the number of subtasks), and then change the fn signature 
to only take shard metadata as parameter. Subclasses can override createFn, 
instead of having the property.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
Github user tweise closed the pull request at:

https://github.com/apache/flink/pull/5393


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-01-30 Thread tweise
GitHub user tweise opened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review