[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5393 ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r166523133 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; --- End diff -- I only see references to the partitioner from the package org.apache.flink.streaming.connectors.kinesis and not from subpackages - which is what I pointed out before. Since you seem to suggest that isn't a convention, I will move the class (to me it is just an observation and not important). ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r166254162 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; --- End diff -- @tweise I think it is fine. For example, we have a `KinesisPartitioner` interface as the public API, and it lives under the same package as `FlinkKinesisConsumer`. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165834978 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; --- End diff -- It would result in import from the parent package in KinesisDataFetcher. Looking at a few other classes it wasn't clear to me that this is the established pattern, so please confirm. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165834292 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams, this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); this.deserializationSchema = checkNotNull(deserializationSchema); + this.shardAssigner = checkNotNull(shardAssigner); --- End diff -- adding this in FlinkKinesisConsumer ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165590401 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. --- End diff -- The overview class Javadoc could probably be less generic. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165590209 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { +/** +* Returns the index of the target subtask that a specific Kafka partition should be +* assigned to. For return values outside the subtask range, modulus operation will +* be applied automatically, hence it is also valid to just return a hash code. +* +* The resulting distribution of shards has the following contract: +* +* 1. Uniform distribution across subtasks +* 2. Deterministic, calls for a given shard always return same index. +* +* +* The above contract is crucial and cannot be broken. Consumer subtasks rely on this +* contract to filter out partitions that they should not subscribe to, guaranteeing +* that all partitions of a single topic will always be assigned to some subtask in a +* uniformly distributed manner. +* +* Kinesis and the consumer support dynamic re-sharding and shard IDs, while sequential, +* cannot be assumed to be consecutive. There is no perfect generic default assignment function. +* Default subtask index assignment, which is based on hash code, may result in skew, +* with some subtasks having many shards assigned and others none. --- End diff -- I feel like this section of the Javadoc should be part of the Javadoc for the original consumer constructors, and should guide them to use the `setShardAssigner` method if the do encounter the case of serious shard skew. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165587359 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -182,13 +191,15 @@ public KinesisDataFetcher(List streams, SourceFunction.SourceContext sourceContext, RuntimeContext runtimeContext, Properties configProps, - KinesisDeserializationSchema deserializationSchema) { + KinesisDeserializationSchema deserializationSchema, + KinesisShardAssigner kinesisShardToSubTaskIndexFn) { --- End diff -- mismatching variable name: `kinesisShardToSubTaskIndexFn` --> `kinesisShardAssigner` or just `shardAssigner` ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165588489 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { +/** +* Returns the index of the target subtask that a specific Kafka partition should be --- End diff -- mentions Kafka partition, should be Kinesis shard ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165589748 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { +/** +* Returns the index of the target subtask that a specific Kafka partition should be +* assigned to. For return values outside the subtask range, modulus operation will +* be applied automatically, hence it is also valid to just return a hash code. +* +* The resulting distribution of shards has the following contract: +* +* 1. Uniform distribution across subtasks +* 2. Deterministic, calls for a given shard always return same index. +* +* +* The above contract is crucial and cannot be broken. Consumer subtasks rely on this +* contract to filter out partitions that they should not subscribe to, guaranteeing +* that all partitions of a single topic will always be assigned to some subtask in a --- End diff -- of a single "**Kinesis stream**", not topic (which is Kafka terms) ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165591410 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, KinesisDeserializationSchema `shardAssigner` ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165587909 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { +/** +* Returns the index of the target subtask that a specific Kafka partition should be +* assigned to. For return values outside the subtask range, modulus operation will +* be applied automatically, hence it is also valid to just return a hash code. +* +* The resulting distribution of shards has the following contract: +* +* 1. Uniform distribution across subtasks +* 2. Deterministic, calls for a given shard always return same index. +* +* +* The above contract is crucial and cannot be broken. Consumer subtasks rely on this +* contract to filter out partitions that they should not subscribe to, guaranteeing +* that all partitions of a single topic will always be assigned to some subtask in a +* uniformly distributed manner. +* +* Kinesis and the consumer support dynamic re-sharding and shard IDs, while sequential, +* cannot be assumed to be consecutive. There is no perfect generic default assignment function. +* Default subtask index assignment, which is based on hash code, may result in skew, +* with some subtasks having many shards assigned and others none. +* +* It is recommended to monitor the shard distribution and adjust assignment appropriately. +* Custom implementation may optimize the hash function or use static overrides to limit skew. + * + * @param shard the shard to determine + * @param numParallelSubtasks total number of subtasks + * @return index or hash code --- End diff -- nit: I think it would be more straightforward if we just say this returns the "target index". If the index value is outside the subtask range, we perform a modulus operation. So basically what the PR is already doing, just re-terming it to be more straightforward. What do you think? This is also just a personal preference :) ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165588395 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; --- End diff -- I think it makes sense to move this class to the same level as `FlinkKinesisConsumer`, since it is part of the public API. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165588295 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { --- End diff -- Add `@PublicEvolving` annotation, to make it clear this is a public API. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165589453 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { +/** +* Returns the index of the target subtask that a specific Kafka partition should be +* assigned to. For return values outside the subtask range, modulus operation will +* be applied automatically, hence it is also valid to just return a hash code. +* +* The resulting distribution of shards has the following contract: --- End diff -- The resulting distribution of shards "**should**" have the following contract. i.e., we can't guarantee it, instead the user implementation should guarantee it. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165586434 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -76,6 +77,9 @@ @Internal public class KinesisDataFetcher { + public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = (shard, subtasks) -> shard.hashCode(); + + --- End diff -- nit: unnecessary empty line ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165586994 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams, this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); this.deserializationSchema = checkNotNull(deserializationSchema); + this.shardAssigner = checkNotNull(shardAssigner); --- End diff -- We also need to try cleaning the closure of the given object (if it is a non-static inner class): ``` ClosureCleaner.clean(shardAssigner, true); ``` ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165588081 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import java.io.Serializable; + +/** + * Utility to map Kinesis shards to Flink subtask indices. + */ +public interface KinesisShardAssigner extends Serializable { +/** --- End diff -- nit: add one empty line above comment block (just a personal preference, though) ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165594279 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, KinesisDeserializationSchema
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
GitHub user tweise reopened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ## What is the purpose of the change Allow the user to customize Kinesis shard to subtask assignment in the Kinesis consumer. ## Brief change log Added pluggable shard assigner. ## Verifying this change Added unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation Javadoc You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5393 commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369 Author: Thomas Weise Date: 2018-01-31T01:44:44Z [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165083343 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -584,17 +594,34 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup // Miscellaneous utility functions // + /** +* Function to map a Kinesis shard to a Flink subtask index. +*/ + public interface ShardToSubtaskIndexFn { + /** +* Function to map a Kinesis shard to a Flink subtask index. +* +* @param shard the shard to determine +* @param totalNumberOfSubtasks total number of subtasks +* @return index or hash code +*/ + // TODO: extra parameter can be eliminated by creating hash function after runtime context is present + int getSubTaskIndex(StreamShardHandle shard, int totalNumberOfSubtasks); --- End diff -- I actually prefer passing the `totalNumberOfSubtasks` value independently, instead of passing in `runtimeContext`. IMO, it provides more context on the nature of the assignment. Moreover, IMO, having a factory as the API is much more complicating for the user. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165081311 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -584,17 +594,34 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup // Miscellaneous utility functions // + /** +* Function to map a Kinesis shard to a Flink subtask index. +*/ + public interface ShardToSubtaskIndexFn { --- End diff -- This also needs to be `Serializable`, since it'll be shipped along with the `JobGraph` to the JM. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165084584 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -93,6 +93,12 @@ /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; + /** +* The function that determines which subtask a shard should be assigned to. +*/ + // TODO: instead of the property, use a factory method that would allow subclass to access source context? --- End diff -- Please see my comments below. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165084048 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -584,17 +594,34 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup // Miscellaneous utility functions // + /** +* Function to map a Kinesis shard to a Flink subtask index. +*/ + public interface ShardToSubtaskIndexFn { + /** +* Function to map a Kinesis shard to a Flink subtask index. --- End diff -- This needs a much more stronger Javadoc explaining the contract of deterministic assignments. See `KafkaTopicPartitionAssigner` for an example. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r165081173 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -584,17 +594,34 @@ private static ShardMetricsReporter registerShardMetrics(MetricGroup metricGroup // Miscellaneous utility functions // + /** +* Function to map a Kinesis shard to a Flink subtask index. +*/ + public interface ShardToSubtaskIndexFn { --- End diff -- I would prefer if this is a top-level class, instead of nested in `KinesisDataFetcher` (which is actually an internal class). Also, the `Fn` name could also be polished a bit. Maybe something like `KinesisShardAssigner` would be better? That would also be coherent name-wise with the Kafka side, which has a `KafkaTopicPartitionAssigner` class (though it isn't exposed). ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5393#discussion_r164952695 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -93,6 +93,12 @@ /** User supplied deserialization schema to convert Kinesis byte messages to Flink objects. */ private final KinesisDeserializationSchema deserializer; + /** +* The function that determines which subtask a shard should be assigned to. +*/ + // TODO: instead of the property, use a factory method that would allow subclass to access source context? --- End diff -- createFn(...) that will allow the function to be created with access to runtime context (like the number of subtasks), and then change the fn signature to only take shard metadata as parameter. Subclasses can override createFn, instead of having the property. ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
Github user tweise closed the pull request at: https://github.com/apache/flink/pull/5393 ---
[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...
GitHub user tweise opened a pull request: https://github.com/apache/flink/pull/5393 [FLINK-8516] Allow for custom hash function for shard to subtask mapping in Kinesis consumer *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tweise/flink FLINK-8516.shardHashing Alternatively you can review an