http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java deleted file mode 100644 index c230f09..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.OutputFieldsGetter; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to - * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. - */ -public class KafkaSpoutStreamsNamedTopics implements KafkaSpoutStreams { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreamsNamedTopics.class); - - private final Map<String, KafkaSpoutStream> topicToStream; - - private KafkaSpoutStreamsNamedTopics(Builder builder) { - this.topicToStream = builder.topicToStream; - LOG.debug("Built {}", this); - } - - /** - * @param topic the topic for which to get output fields - * @return the declared output fields - */ - public Fields getOutputFields(String topic) { - if (topicToStream.containsKey(topic)) { - final Fields outputFields = topicToStream.get(topic).getOutputFields(); - LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields); - return outputFields; - } - throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); - } - - /** - * @param topic the topic to for which to get the stream id - * @return the id of the stream to where the tuples are emitted - */ - public KafkaSpoutStream getStream(String topic) { - if (topicToStream.containsKey(topic)) { - return topicToStream.get(topic); - } - throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); - } - - /** - * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} - */ - public List<String> getTopics() { - return new ArrayList<>(topicToStream.keySet()); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (KafkaSpoutStream stream : topicToStream.values()) { - if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { - stream.declareOutputFields(declarer); - } - } - } - - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - getStream(messageId.topic()).emit(collector, tuple, messageId); - } - - @Override - public String toString() { - return "KafkaSpoutStreamsNamedTopics{" + - "topicToStream=" + topicToStream + - '}'; - } - - public static class Builder { - private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();; - - /** - * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified. - * All topics will have the default stream id and the same output fields. - */ - public Builder(Fields outputFields, String... topics) { - addStream(outputFields, topics); - } - - /** - * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. - * All the topics will have the specified stream id and the same output fields. - */ - public Builder (Fields outputFields, String streamId, String... topics) { - addStream(outputFields, streamId, topics); - } - - /** - * Adds this stream to the state representing the streams associated with each topic - */ - public Builder(KafkaSpoutStream stream) { - addStream(stream); - } - - /** - * Adds this stream to the state representing the streams associated with each topic - */ - public Builder addStream(KafkaSpoutStream stream) { - topicToStream.put(stream.getTopic(), stream); - return this; - } - - /** - * Please refer to javadoc in {@link #Builder(Fields, String...)} - */ - public Builder addStream(Fields outputFields, String... topics) { - addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics); - return this; - } - - /** - * Please refer to javadoc in {@link #Builder(Fields, String, String...)} - */ - public Builder addStream(Fields outputFields, String streamId, String... topics) { - for (String topic : topics) { - topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); - } - return this; - } - - public KafkaSpoutStreamsNamedTopics build() { - return new KafkaSpoutStreamsNamedTopics(this); - } - } -}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java deleted file mode 100644 index 5c3bd47..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; - -import java.util.List; -import java.util.regex.Pattern; - -public class KafkaSpoutStreamsWildcardTopics implements KafkaSpoutStreams { - private KafkaSpoutStream kafkaSpoutStream; - - public KafkaSpoutStreamsWildcardTopics(KafkaSpoutStream kafkaSpoutStream) { - this.kafkaSpoutStream = kafkaSpoutStream; - if (kafkaSpoutStream.getTopicWildcardPattern() == null) { - throw new IllegalStateException("KafkaSpoutStream must be configured for wildcard topic"); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - kafkaSpoutStream.declareOutputFields(declarer); - } - - @Override - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - kafkaSpoutStream.emit(collector, tuple, messageId); - } - - public KafkaSpoutStream getStream() { - return kafkaSpoutStream; - } - - public Pattern getTopicWildcardPattern() { - return kafkaSpoutStream.getTopicWildcardPattern(); - } - - @Override - public String toString() { - return "KafkaSpoutStreamsWildcardTopics{" + - "kafkaSpoutStream=" + kafkaSpoutStream + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java deleted file mode 100644 index 3bb71a8..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s. - * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder} - */ -public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable { - private List<String> topics; - - /** - * @param topics list of topics that use this implementation to build tuples - */ - public KafkaSpoutTupleBuilder(String... topics) { - if (topics == null || topics.length == 0) { - throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty"); - } - this.topics = Arrays.asList(topics); - } - - /** - * @return list of topics that use this implementation to build tuples - */ - public List<String> getTopics() { - return Collections.unmodifiableList(topics); - } - - /** - * Builds a list of tuples using the ConsumerRecord specified as parameter - * @param consumerRecord whose contents are used to build tuples - * @return list of tuples - */ - public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord); -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java deleted file mode 100644 index 2ba0a79..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.io.Serializable; -import java.util.List; - -/** - * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s. - * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances - */ -public interface KafkaSpoutTuplesBuilder<K,V> extends Serializable { - List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord); -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java deleted file mode 100644 index 80fe543..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class KafkaSpoutTuplesBuilderNamedTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilderNamedTopics.class); - - private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; - - private KafkaSpoutTuplesBuilderNamedTopics(Builder<K,V> builder) { - this.topicToTupleBuilders = builder.topicToTupleBuilders; - LOG.debug("Instantiated {}", this); - } - - public static class Builder<K,V> { - private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders; - private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; - - @SafeVarargs - public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) { - if (tupleBuilders == null || tupleBuilders.length == 0) { - throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); - } - - this.tupleBuilders = Arrays.asList(tupleBuilders); - topicToTupleBuilders = new HashMap<>(); - } - - public KafkaSpoutTuplesBuilderNamedTopics<K,V> build() { - for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) { - for (String topic : tupleBuilder.getTopics()) { - if (!topicToTupleBuilders.containsKey(topic)) { - topicToTupleBuilders.put(topic, tupleBuilder); - } - } - } - return new KafkaSpoutTuplesBuilderNamedTopics<>(this); - } - } - - public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) { - final String topic = consumerRecord.topic(); - return topicToTupleBuilders.get(topic).buildTuple(consumerRecord); - } - - @Override - public String toString() { - return "KafkaSpoutTuplesBuilderNamedTopics {" + - "topicToTupleBuilders=" + topicToTupleBuilders + - '}'; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java deleted file mode 100644 index 85d4809..0000000 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.List; - -public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> { - private KafkaSpoutTupleBuilder<K, V> tupleBuilder; - - public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) { - this.tupleBuilder = tupleBuilder; - } - - @Override - public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { - return tupleBuilder.buildTuple(consumerRecord); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java new file mode 100644 index 0000000..f5953ad --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import org.apache.storm.tuple.Values; + +/** + * A list of Values in a tuple that can be routed + * to a given stream. {@see org.apache.storm.kafka.spout.RecordTranslator#apply} + */ +public class KafkaTuple extends Values { + private static final long serialVersionUID = 4803794470450587992L; + private String stream = null; + + public KafkaTuple() { + super(); + } + + public KafkaTuple(Object... vals) { + super(vals); + } + + public KafkaTuple routedTo(String stream) { + assert(this.stream == null); + this.stream = stream; + return this; + } + + public String getStream() { + return stream; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java new file mode 100644 index 0000000..3f16220 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + + +/** + * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout. + */ +public interface KafkaTupleListener extends Serializable { + + + /** + * Called during the initialization of the kafka spout. + * + * @param conf The storm configuration. + * @param context The {@link TopologyContext} + */ + void open(Map<String, Object> conf, TopologyContext context); + + /** + * Called when the tuple is emitted and auto commit is disabled. + * If kafka auto commit is enabled, the kafka consumer will periodically (depending on the commit interval) + * commit the offsets. Therefore, storm disables anchoring for tuples when auto commit is enabled and the spout will + * not receive acks and fails for those tuples. + * + * @param tuple the storm tuple. + * @param msgId The id of the tuple in the spout. + */ + void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId); + + /** + * Called when a tuple is acked. + * + * @param msgId The id of the tuple in the spout. + */ + void onAck(KafkaSpoutMessageId msgId); + + /** + * Called when kafka partitions are rebalanced. + * + * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously + * assigned to the consumer) + */ + void onPartitionsReassigned(Collection<TopicPartition> partitions); + + /** + * Called when the Kafka spout sets a record for retry. + * + * @param msgId The id of the tuple in the spout. + */ + void onRetry(KafkaSpoutMessageId msgId); + + /** + * Called when the maximum number of retries have been reached. + * + * @param msgId The id of the tuple in the spout. + */ + void onMaxRetryReached(KafkaSpoutMessageId msgId); +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java new file mode 100644 index 0000000..05cd361 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.TopicPartitionComparator; +import org.apache.storm.task.TopologyContext; + +public class ManualPartitionSubscription extends Subscription { + private static final long serialVersionUID = 5633018073527583826L; + private final ManualPartitioner partitioner; + private final TopicFilter partitionFilter; + private transient KafkaConsumer<?, ?> consumer = null; + private transient ConsumerRebalanceListener listener = null; + private transient TopologyContext context = null; + + public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) { + this.partitionFilter = partitionFilter; + this.partitioner = parter; + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) { + this.consumer = consumer; + this.listener = listener; + this.context = context; + refreshAssignment(); + } + + @Override + public void refreshAssignment() { + List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer); + Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE); + Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context)); + Set<TopicPartition> currentAssignment = consumer.assignment(); + if (!newAssignment.equals(currentAssignment)) { + listener.onPartitionsRevoked(currentAssignment); + consumer.assign(newAssignment); + listener.onPartitionsAssigned(newAssignment); + } + } + + @Override + public String getTopicsString() { + return partitionFilter.getTopicsString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java new file mode 100644 index 0000000..f9a6869 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.List; + +import java.io.Serializable; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +/** + * A function used to assign partitions to this spout. + * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions. + * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total + * number of spouts to avoid missing partitions or double assigning partitions. + */ +public interface ManualPartitioner extends Serializable { + /** + * Get the partitions for this assignment + * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering + * @param context the context of the topology + * @return the subset of the partitions that this spout should use. + */ + public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context); +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java new file mode 100644 index 0000000..3bb7152 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Subscribe to all topics that follow a given list of values. + * @deprecated Please use {@link ManualPartitionSubscription} with {@link NamedTopicFilter} instead + */ +@Deprecated +public class NamedSubscription extends Subscription { + private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class); + private static final long serialVersionUID = 3438543305215813839L; + protected final Collection<String> topics; + + public NamedSubscription(Collection<String> topics) { + this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics)); + } + + public NamedSubscription(String ... topics) { + this(Arrays.asList(topics)); + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) { + consumer.subscribe(topics, listener); + LOG.info("Kafka consumer subscribed topics {}", topics); + + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + consumer.poll(0); + } + + @Override + public String getTopicsString() { + return StringUtils.join(topics, ","); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java new file mode 100644 index 0000000..3d1ec1e --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for the specified topics. + */ +public class NamedTopicFilter implements TopicFilter { + + private final Set<String> topics; + + /** + * Create filter based on a set of topic names. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(Set<String> topics) { + this.topics = Collections.unmodifiableSet(topics); + } + + /** + * Convenience constructor. + * @param topics The topic names the filter will pass. + */ + public NamedTopicFilter(String... topics) { + this(new HashSet<>(Arrays.asList(topics))); + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + List<TopicPartition> allPartitions = new ArrayList<>(); + for (String topic : topics) { + for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return StringUtils.join(topics, ","); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java new file mode 100644 index 0000000..dc9f9e3 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Subscribe to all topics that match a given pattern. + * @deprecated Please use {@link ManualPartitionSubscription} with {@link PatternTopicFilter} instead + */ +@Deprecated +public class PatternSubscription extends Subscription { + private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class); + private static final long serialVersionUID = 3438543305215813839L; + protected final Pattern pattern; + + public PatternSubscription(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) { + consumer.subscribe(pattern, listener); + LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); + + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + consumer.poll(0); + } + + @Override + public String getTopicsString() { + return pattern.pattern(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java new file mode 100644 index 0000000..5d2db96 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java @@ -0,0 +1,70 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Filter that returns all partitions for topics matching the given {@link Pattern}. + */ +public class PatternTopicFilter implements TopicFilter { + + private final Pattern pattern; + private final Set<String> topics = new HashSet<>(); + + /** + * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter. + * + * @param pattern The Pattern to use. + */ + public PatternTopicFilter(Pattern pattern) { + this.pattern = pattern; + } + + @Override + public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) { + topics.clear(); + List<TopicPartition> allPartitions = new ArrayList<>(); + for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo partitionInfo : entry.getValue()) { + allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); + topics.add(partitionInfo.topic()); + } + } + } + return allPartitions; + } + + @Override + public String getTopicsString() { + return StringUtils.join(topics, ","); + } + + public String getTopicsPattern() { + return pattern.pattern(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java new file mode 100644 index 0000000..71af4d0 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder; + +/** + * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a tuple. + */ +public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecord<K, V>, List<Object>> { + public static final List<String> DEFAULT_STREAM = Collections.singletonList("default"); + + /** + * Translate the ConsumerRecord into a list of objects that can be emitted + * @param record the record to translate + * @return the objects in the tuple. Return a {@link KafkaTuple} + * if you want to route the tuple to a non-default stream. + * Return null to discard an invalid {@link ConsumerRecord} if {@link Builder#setEmitNullTuples(boolean)} is set to true + */ + List<Object> apply(ConsumerRecord<K,V> record); + + /** + * Get the fields associated with a stream. The streams passed in are + * returned by the {@link RecordTranslator.streams} method. + * @param stream the stream the fields are for + * @return the fields for that stream. + */ + Fields getFieldsFor(String stream); + + /** + * @return the list of streams that this will handle. + */ + List<String> streams(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java new file mode 100644 index 0000000..e23e2dc --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.task.TopologyContext; + +/** + * Assign partitions in a round robin fashion for all spouts, + * not just the ones that are alive. Because the parallelism of + * the spouts does not typically change while running this makes + * the assignments more stable in the face of crashing spouts. + * + * Round Robin means that first spout of N spouts will get the first + * partition, and the N+1th partition... The second spout will get the second partition and + * N+2th partition etc. + */ +public class RoundRobinManualPartitioner implements ManualPartitioner { + + @Override + public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) { + int thisTaskIndex = context.getThisTaskIndex(); + int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size(); + Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size()/totalTaskCount+1); + for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) { + myPartitions.add(allPartitions.get(i)); + } + return new ArrayList<>(myPartitions); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java new file mode 100644 index 0000000..cc37348 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * @param <T> The type this deserializer deserializes to. + * @deprecated Avoid using this class. Use {@link KafkaSpoutConfig.Builder#setProp(java.lang.String, java.lang.Object) } with + * {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead + */ +@Deprecated +public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java new file mode 100644 index 0000000..46c2849 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; + +public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> { + private static final long serialVersionUID = 4678369144122009596L; + private final Fields fields; + private final Func<ConsumerRecord<K, V>, List<Object>> func; + private final String stream; + + public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) { + this(func, fields, "default"); + } + + public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) { + this.func = func; + this.fields = fields; + this.stream = stream; + } + + @Override + public List<Object> apply(ConsumerRecord<K, V> record) { + KafkaTuple ret = new KafkaTuple(); + ret.addAll(func.apply(record)); + return ret.routedTo(stream); + } + + @Override + public Fields getFieldsFor(String stream) { + return fields; + } + + @Override + public List<String> streams() { + return Arrays.asList(stream); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java new file mode 100644 index 0000000..722039d --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.io.Serializable; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.storm.task.TopologyContext; + +/** + * A subscription to kafka. + */ +public abstract class Subscription implements Serializable { + private static final long serialVersionUID = -216136367240198716L; + + /** + * Subscribe the KafkaConsumer to the proper topics. Implementations must ensure that a given topic partition is always assigned to the + * same spout task. Adding and removing partitions as necessary is fine, but partitions must not move from one task to another. This + * constraint is only important for use with the Trident spout. + * + * @param consumer the Consumer to get. + * @param listener the rebalance listener to include in the subscription + */ + public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context); + + /** + * @return A human-readable string representing the subscribed topics. + */ + public abstract String getTopicsString(); + + /** + * NOOP is the default behavior, which means that Kafka will internally handle partition assignment. + * If you wish to do manual partition management, you must provide an implementation of this method + * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe + * to inform the rest of the system of those changes. + */ + public void refreshAssignment() { + //NOOP + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java new file mode 100644 index 0000000..7631c8a --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import java.io.Serializable; +import java.util.List; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public interface TopicFilter extends Serializable { + + /** + * Get the Kafka TopicPartitions passed by this filter. + * @param consumer The Kafka consumer to use to read the list of existing partitions + * @return The Kafka partitions passed by this filter. + */ + List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer); + + /** + * @return A human-readable string representing the topics that pass the filter. + */ + String getTopicsString(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java new file mode 100644 index 0000000..dafb97c --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import java.util.Comparator; + +import org.apache.kafka.common.TopicPartition; + +/** + * Singleton comparator of TopicPartitions. Topics have precedence over partitions. + * Topics are compared through String.compare and partitions are compared + * numerically. + * + * Use INSTANCE for all sorting. + */ +public class TopicPartitionComparator implements Comparator<TopicPartition> { + public static final TopicPartitionComparator INSTANCE = new TopicPartitionComparator(); + + /** + * Private to make it a singleton + */ + private TopicPartitionComparator() { + //Empty + } + + @Override + public int compare(TopicPartition o1, TopicPartition o2) { + if (!o1.topic().equals(o2.topic())) { + return o1.topic().compareTo(o2.topic()); + } else { + return o1.partition() - o2.partition(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java new file mode 100644 index 0000000..b7fd1a6 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Object representing metadata committed to Kafka. + */ +public class CommitMetadata { + private final String topologyId; + private final int taskId; + private final String threadName; + + /** Kafka metadata. */ + @JsonCreator + public CommitMetadata(@JsonProperty("topologyId") String topologyId, + @JsonProperty("taskId") int taskId, + @JsonProperty("threadName") String threadName) { + + this.topologyId = topologyId; + this.taskId = taskId; + this.threadName = threadName; + } + + public String getTopologyId() { + return topologyId; + } + + public int getTaskId() { + return taskId; + } + + public String getThreadName() { + return threadName; + } + + @Override + public String toString() { + return "CommitMetadata{" + + "topologyId='" + topologyId + '\'' + + ", taskId=" + taskId + + ", threadName='" + threadName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java new file mode 100644 index 0000000..a63619c --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generates and reads commit metadata. + */ +public final class CommitMetadataManager { + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class); + // Metadata information to commit to Kafka. It is unique per spout instance. + private final String commitMetadata; + private final ProcessingGuarantee processingGuarantee; + private final TopologyContext context; + + /** + * Create a manager with the given context. + */ + public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) { + this.context = context; + try { + commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata( + context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName())); + this.processingGuarantee = processingGuarantee; + } catch (JsonProcessingException e) { + LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e); + throw new RuntimeException(e); + } + } + + /** + * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology. + * + * @param tp The topic partition the commit metadata belongs to. + * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka + * @param offsetManagers The offset managers. + * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise + */ + public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset, + Map<TopicPartition, OffsetManager> offsetManagers) { + try { + if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE + && offsetManagers.containsKey(tp) + && offsetManagers.get(tp).hasCommitted()) { + return true; + } + + final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class); + return committedMetadata.getTopologyId().equals(context.getStormId()); + } catch (IOException e) { + LOG.warn("Failed to deserialize expected commit metadata [{}]." + + " This error is expected to occur once per partition, if the last commit to each partition" + + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. " + + "Defaulting to behavior compatible with earlier version", committedOffset); + LOG.trace("", e); + return false; + } + } + + public String getCommitMetadata() { + return commitMetadata; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java index 7900388..ec2fbac 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java @@ -22,8 +22,7 @@ public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K @Override public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { - return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), - kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java new file mode 100755 index 0000000..c9f9541 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -0,0 +1,246 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.internal; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeSet; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages acked and committed offsets for a TopicPartition. This class is not thread safe + */ +public class OffsetManager { + private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator(); + private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class); + + private final TopicPartition tp; + // Emitted Offsets List + private final NavigableSet<Long> emittedOffsets = new TreeSet<>(); + // Acked messages sorted by ascending order of offset + private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); + // Committed offset, i.e. the offset where processing will resume upon spout restart. Initially it is set to fetchOffset. + private long committedOffset; + // True if this OffsetManager has made at least one commit to Kafka + private boolean committed; + private long latestEmittedOffset; + + /** + * Creates a new OffsetManager. + * @param tp The TopicPartition + * @param initialFetchOffset The initial fetch offset for the given TopicPartition + */ + public OffsetManager(TopicPartition tp, long initialFetchOffset) { + this.tp = tp; + this.committedOffset = initialFetchOffset; + LOG.debug("Instantiated {}", this.toString()); + } + + public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N) + ackedMsgs.add(msgId); + } + + public void addToEmitMsgs(long offset) { + this.emittedOffsets.add(offset); // O(Log N) + this.latestEmittedOffset = Math.max(latestEmittedOffset, offset); + } + + public int getNumUncommittedOffsets() { + return this.emittedOffsets.size(); + } + + /** + * Gets the offset of the nth emitted message after the committed offset. + * Example: If the committed offset is 0 and offsets 1, 2, 8, 10 have been emitted, + * getNthUncommittedOffsetAfterCommittedOffset(3) returns 8. + * + * @param index The index of the message to get the offset for + * @return The offset + * @throws NoSuchElementException if the index is out of range + */ + public long getNthUncommittedOffsetAfterCommittedOffset(int index) { + Iterator<Long> offsetIter = emittedOffsets.iterator(); + for (int i = 0; i < index - 1; i++) { + offsetIter.next(); + } + return offsetIter.next(); + } + + /** + * An offset can only be committed when all emitted records with lower offset have been + * acked. This guarantees that all offsets smaller than the committedOffset + * have been delivered, or that those offsets no longer exist in Kafka. + * <p/> + * The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API. + * + * @param commitMetadata Metadata information to commit to Kafka. It is constant per KafkaSpout instance per topology + * @return the next OffsetAndMetadata to commit, or null if no offset is + * ready to commit. + */ + public OffsetAndMetadata findNextCommitOffset(final String commitMetadata) { + boolean found = false; + long currOffset; + long nextCommitOffset = committedOffset; + + for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap + currOffset = currAckedMsg.offset(); + if (currOffset == nextCommitOffset) { + // found the next offset to commit + found = true; + nextCommitOffset = currOffset + 1; + } else if (currOffset > nextCommitOffset) { + if (emittedOffsets.contains(nextCommitOffset)) { + LOG.debug("topic-partition [{}] has non-sequential offset [{}]." + + " It will be processed in a subsequent batch.", tp, currOffset); + break; + } else { + /* + This case will arise in case of non-sequential offset being processed. + So, if the topic doesn't contain offset = nextCommitOffset (possible + if the topic is compacted or deleted), the consumer should jump to + the next logical point in the topic. Next logical offset should be the + first element after nextCommitOffset in the ascending ordered emitted set. + */ + LOG.debug("Processed non-sequential offset." + + " The earliest uncommitted offset is no longer part of the topic." + + " Missing offset: [{}], Processed: [{}]", nextCommitOffset, currOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); + if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { + LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", + currOffset, nextCommitOffset); + nextCommitOffset = currOffset + 1; + } else { + LOG.debug("Topic-partition [{}] has non-sequential offset [{}]." + + " Next offset to commit should be [{}]", tp, currOffset, nextCommitOffset); + break; + } + } + } else { + throw new IllegalStateException("The offset [" + currOffset + "] is below the current nextCommitOffset " + + "[" + nextCommitOffset + "] for [" + tp + "]." + + " This should not be possible, and likely indicates a bug in the spout's acking or emit logic."); + } + } + + OffsetAndMetadata nextCommitOffsetAndMetadata = null; + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, commitMetadata); + + LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed." + + " Processing will resume at offset [{}] upon spout restart", + tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, nextCommitOffsetAndMetadata.offset()); + } else { + LOG.debug("Topic-partition [{}] has no offsets ready to be committed", tp); + } + LOG.trace("{}", this); + return nextCommitOffsetAndMetadata; + } + + /** + * Marks an offset as committed. This method has side effects - it sets the + * internal state in such a way that future calls to + * {@link #findNextCommitOffset(String)} will return offsets greater than or equal to the + * offset specified, if any. + * + * @param committedOffsetAndMeta The committed offset. All lower offsets are expected to have been committed. + * @return Number of offsets committed in this commit + */ + public long commit(OffsetAndMetadata committedOffsetAndMeta) { + committed = true; + final long preCommitCommittedOffset = this.committedOffset; + long numCommittedOffsets = 0; + this.committedOffset = committedOffsetAndMeta.offset(); + for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) { + if (iterator.next().offset() < committedOffsetAndMeta.offset()) { + iterator.remove(); + numCommittedOffsets++; + } else { + break; + } + } + + for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) { + if (iterator.next() < committedOffsetAndMeta.offset()) { + iterator.remove(); + } else { + break; + } + } + + LOG.trace("{}", this); + + LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]." + + " Processing will resume at [{}] upon spout restart", + numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset); + + return numCommittedOffsets; + } + + /** + * Checks if this OffsetManager has committed to Kafka. + * + * @return true if this OffsetManager has made at least one commit to Kafka, false otherwise + */ + public boolean hasCommitted() { + return committed; + } + + public boolean contains(KafkaSpoutMessageId msgId) { + return ackedMsgs.contains(msgId); + } + + @VisibleForTesting + boolean containsEmitted(long offset) { + return emittedOffsets.contains(offset); + } + + public long getLatestEmittedOffset() { + return latestEmittedOffset; + } + + public long getCommittedOffset() { + return committedOffset; + } + + @Override + public final String toString() { + return "OffsetManager{" + + "topic-partition=" + tp + + ", committedOffset=" + committedOffset + + ", emittedOffsets=" + emittedOffsets + + ", ackedMsgs=" + ackedMsgs + + ", latestEmittedOffset=" + latestEmittedOffset + + '}'; + } + + private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> { + + @Override + public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { + return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java new file mode 100644 index 0000000..2a2e1cb --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.internal; + +import java.util.concurrent.TimeUnit; +import org.apache.storm.utils.Time; + +public class Timer { + private final long delay; + private final long period; + private final TimeUnit timeUnit; + private final long periodNanos; + private long start; + + /** + * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link + * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns + * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay. + * + * @param delay the initial delay before the timer starts + * @param period the period between calls {@link #isExpiredResetOnTrue()} + * @param timeUnit the time unit of delay and period + */ + public Timer(long delay, long period, TimeUnit timeUnit) { + this.delay = delay; + this.period = period; + this.timeUnit = timeUnit; + + periodNanos = timeUnit.toNanos(period); + start = Time.nanoTime() + timeUnit.toNanos(delay); + } + + public long period() { + return period; + } + + public long delay() { + return delay; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + /** + * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the + * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset + * (re-initiated) and a new cycle will start. + * + * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false + * otherwise. + */ + public boolean isExpiredResetOnTrue() { + final boolean expired = Time.nanoTime() - start >= periodNanos; + if (expired) { + start = Time.nanoTime(); + } + return expired; + } +} \ No newline at end of file