[hotfix] [kafka connector] Replace funky loop with simple division in FixedPartitioner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9637ee78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9637ee78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9637ee78 Branch: refs/heads/master Commit: 9637ee78846e4df5ef328c620cc991d394056f61 Parents: 1ea5e13 Author: Stephan Ewen <se...@apache.org> Authored: Wed Jan 27 12:20:59 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Jan 28 13:41:38 2016 +0100 ---------------------------------------------------------------------- .../kafka/partitioner/FixedPartitioner.java | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9637ee78/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java index d9dcfc1..9b848e0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java @@ -54,27 +54,23 @@ import java.io.Serializable; public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable { private static final long serialVersionUID = 1627268846962918126L; - int targetPartition = -1; + private int targetPartition = -1; @Override public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { - int p = 0; - for (int i = 0; i < parallelInstances; i++) { - if (i == parallelInstanceId) { - targetPartition = partitions[p]; - return; - } - if (++p == partitions.length) { - p = 0; - } + if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) { + throw new IllegalArgumentException(); } + + this.targetPartition = partitions[parallelInstanceId % partitions.length]; } @Override public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - if (targetPartition == -1) { + if (targetPartition >= 0) { + return targetPartition; + } else { throw new RuntimeException("The partitioner has not been initialized properly"); } - return targetPartition; } }