http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java new file mode 100644 index 0000000..afe8f74 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.metrics; + +import com.google.common.base.Supplier; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.internal.OffsetManager; +import org.apache.storm.metric.api.IMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This class is used compute the partition and topic level offset metrics + * <p> + * Partition level metrics are: + * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition + * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition + * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout + * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout + * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset + * topicName/partition_{number}/recordsInPartition // total number of records in the partition + * </p> + * <p> + * Topic level metrics are: + * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout + * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout + * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout + * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout + * topicName/spoutLag // total spout lag of all the associated partitions of this spout + * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout + * </p> + */ +public class KafkaOffsetMetric implements IMetric { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); + private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier; + private final Supplier<KafkaConsumer> consumerSupplier; + + public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) { + this.offsetManagerSupplier = offsetManagerSupplier; + this.consumerSupplier = consumerSupplier; + } + + @Override + public Object getValueAndReset() { + + Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get(); + KafkaConsumer kafkaConsumer = consumerSupplier.get(); + + if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) { + LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null."); + return null; + } + + Map<String,TopicMetrics> topicMetricsMap = new HashMap<>(); + Set<TopicPartition> topicPartitions = offsetManagers.keySet(); + + Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions); + Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions); + //map to hold partition level and topic level metrics + Map<String, Long> result = new HashMap<>(); + + for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + OffsetManager offsetManager = entry.getValue(); + + long latestTimeOffset = endOffsets.get(topicPartition); + long earliestTimeOffset = beginningOffsets.get(topicPartition); + + long latestEmittedOffset = offsetManager.getLatestEmittedOffset(); + long latestCompletedOffset = offsetManager.getCommittedOffset(); + long spoutLag = latestTimeOffset - latestCompletedOffset; + long recordsInPartition = latestTimeOffset - earliestTimeOffset; + + String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); + result.put(metricPath + "/" + "spoutLag", spoutLag); + result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); + result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); + result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); + result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); + result.put(metricPath + "/" + "recordsInPartition", recordsInPartition); + + TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic()); + if (topicMetrics == null) { + topicMetrics = new TopicMetrics(); + topicMetricsMap.put(topicPartition.topic(), topicMetrics); + } + + topicMetrics.totalSpoutLag += spoutLag; + topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; + topicMetrics.totalLatestTimeOffset += latestTimeOffset; + topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; + topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; + topicMetrics.totalRecordsInPartitions += recordsInPartition; + } + + for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { + String topic = e.getKey(); + TopicMetrics topicMetrics = e.getValue(); + result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); + result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); + result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); + result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); + result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); + result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions); + } + + LOG.debug("Metrics Tick: value : {}", result); + return result; + } + + private class TopicMetrics { + long totalSpoutLag = 0; + long totalEarliestTimeOffset = 0; + long totalLatestTimeOffset = 0; + long totalLatestEmittedOffset = 0; + long totalLatestCompletedOffset = 0; + long totalRecordsInPartitions = 0; + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java new file mode 100644 index 0000000..6fa81aa --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang.Validate; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.List; + +/** + * Wraps transaction batch information + */ +public class KafkaTridentSpoutBatchMetadata implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class); + private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer(); + + public static final String TOPIC_PARTITION_KEY = "tp"; + public static final String FIRST_OFFSET_KEY = "firstOffset"; + public static final String LAST_OFFSET_KEY = "lastOffset"; + + // topic partition of this batch + private final TopicPartition topicPartition; + // first offset of this batch + private final long firstOffset; + // last offset of this batch + private final long lastOffset; + + /** + * Builds a metadata object. + * @param topicPartition The topic partition + * @param firstOffset The first offset for the batch + * @param lastOffset The last offset for the batch + */ + public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) { + this.topicPartition = topicPartition; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + + /** + * Builds a metadata object from a non-empty set of records. + * @param topicPartition The topic partition the records belong to. + * @param consumerRecords The non-empty set of records. + */ + public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) { + Validate.notNull(consumerRecords.records(topicPartition)); + List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition); + Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata"); + + this.topicPartition = topicPartition; + firstOffset = records.get(0).offset(); + lastOffset = records.get(records.size() - 1).offset(); + LOG.debug("Created {}", this.toString()); + } + + public long getFirstOffset() { + return firstOffset; + } + + public long getLastOffset() { + return lastOffset; + } + + public TopicPartition getTopicPartition() { + return topicPartition; + } + + /** + * Constructs a metadata object from a Map in the format produced by {@link #toMap() }. + * @param map The source map + * @return A new metadata object + */ + public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) { + Map<String, Object> topicPartitionMap = (Map<String, Object>)map.get(TOPIC_PARTITION_KEY); + TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap); + return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(), + ((Number)map.get(LAST_OFFSET_KEY)).longValue()); + } + + /** + * Writes this metadata object to a Map so Trident can read/write it to Zookeeper. + */ + public Map<String, Object> toMap() { + Map<String, Object> map = new HashMap<>(); + map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition)); + map.put(FIRST_OFFSET_KEY, firstOffset); + map.put(LAST_OFFSET_KEY, lastOffset); + return map; + } + + @Override + public final String toString() { + return super.toString() + + "{topicPartition=" + topicPartition + + ", firstOffset=" + firstOffset + + ", lastOffset=" + lastOffset + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java new file mode 100644 index 0000000..3b4aa4b --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.kafka.spout.internal.Timer; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.trident.topology.TransactionAttempt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter< + List<Map<String, Object>>, + KafkaTridentSpoutTopicPartition, + Map<String, Object>>, + Serializable { + + private static final long serialVersionUID = -7343927794834130435L; + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class); + + // Kafka + private final KafkaConsumer<K, V> kafkaConsumer; + + // Bookkeeping + private final KafkaTridentSpoutManager<K, V> kafkaManager; + // set of topic-partitions for which first poll has already occurred, and the first polled txid + private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); + + // Declare some KafkaTridentSpoutManager references for convenience + private final long pollTimeoutMs; + private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; + private final RecordTranslator<K, V> translator; + private final Timer refreshSubscriptionTimer; + private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); + + private TopologyContext topologyContext; + + /** + * Create a new Kafka spout emitter. + * @param kafkaManager The Kafka consumer manager to use + * @param topologyContext The topology context + * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription + */ + public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) { + this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext); + this.kafkaManager = kafkaManager; + this.topologyContext = topologyContext; + this.refreshSubscriptionTimer = refreshSubscriptionTimer; + this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator(); + + final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig(); + this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs(); + this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + LOG.debug("Created {}", this.toString()); + } + + /** + * Creates instance of this class with default 500 millisecond refresh subscription timer + */ + public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) { + this(kafkaManager, topologyContext, new Timer(500, + kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); + } + + @Override + public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, + KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) { + + LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", + tx, currBatchPartition, lastBatch, collector); + + final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); + final Set<TopicPartition> assignments = kafkaConsumer.assignment(); + KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch); + KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta; + Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet(); + + if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) { + LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " + + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments, + kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()); + } else { + try { + // pause other topic-partitions to only poll from current topic-partition + pausedTopicPartitions = pauseTopicPartitions(currBatchTp); + + seek(currBatchTp, lastBatchMeta, tx.getTransactionId()); + + // poll + if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { + kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment(); + } + + final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs); + LOG.debug("Polled [{}] records from Kafka.", records.count()); + + if (!records.isEmpty()) { + emitTuples(collector, records); + // build new metadata + currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records); + } + } finally { + kafkaConsumer.resume(pausedTopicPartitions); + LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); + } + LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); + } + + return currentBatch == null ? null : currentBatch.toMap(); + } + + private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) { + for (ConsumerRecord<K, V> record : records) { + final List<Object> tuple = translator.apply(record); + collector.emit(tuple); + LOG.debug("Emitted tuple {} for record [{}]", tuple, record); + } + } + + /** + * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition. + * Otherwise the next offset will be one past the last batch, based on lastBatchMeta. + * + * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic), + * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition. + * In the second case, there are either no previous transactions, or the MBC is still committing them + * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null. + * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance. + * + * @return the offset of the next fetch + */ + private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) { + if (isFirstPoll(tp, transactionId)) { + if (firstPollOffsetStrategy == EARLIEST) { + LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); + kafkaConsumer.seekToBeginning(Collections.singleton(tp)); + } else if (firstPollOffsetStrategy == LATEST) { + LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); + kafkaConsumer.seekToEnd(Collections.singleton(tp)); + } else if (lastBatchMeta != null) { + LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); + kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch + } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { + LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp); + kafkaConsumer.seekToBeginning(Collections.singleton(tp)); + } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) { + LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp); + kafkaConsumer.seekToEnd(Collections.singleton(tp)); + } + firstPollTransaction.put(tp, transactionId); + } else { + kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch + LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); + } + + final long fetchOffset = kafkaConsumer.position(tp); + LOG.debug("Set [fetchOffset = {}]", fetchOffset); + return fetchOffset; + } + + private boolean isFirstPoll(TopicPartition tp, long txid) { + // The first poll is either the "real" first transaction, or a replay of the first transaction + return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid; + } + + // returns paused topic-partitions. + private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) { + final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment()); + LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions); + pausedTopicPartitions.remove(excludedTp); + kafkaConsumer.pause(pausedTopicPartitions); + LOG.debug("Paused topic-partitions {}", pausedTopicPartitions); + return pausedTopicPartitions; + } + + @Override + public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) { + LOG.trace("Refreshing of topic-partitions handled by Kafka. " + + "No action taken by this method for topic partitions {}", partitionResponsibilities); + } + + /** + * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions + * for this task must be assigned to the Kafka consumer running on this task. + * + * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator} + * @return ordered list of topic partitions for this task + */ + @Override + public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) { + List<TopicPartition> allTopicPartitions = new ArrayList<>(); + for(Map<String, Object> map : allPartitionInfo) { + allTopicPartitions.add(tpSerializer.fromMap(map)); + } + final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions); + LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", + allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); + return allPartitions; + } + + @Override + public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, + List<Map<String, Object>> allPartitionInfo) { + final Set<TopicPartition> assignedTps = kafkaConsumer.assignment(); + LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps); + final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps); + LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId); + return taskTps; + } + + private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) { + final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size()); + if (tps != null) { + for (TopicPartition tp : tps) { + LOG.trace("Added topic-partition [{}]", tp); + kttp.add(new KafkaTridentSpoutTopicPartition(tp)); + } + } + return kttp; + } + + private int getNumTasks() { + return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size(); + } + + @Override + public void close() { + kafkaConsumer.close(); + LOG.debug("Closed"); + } + + @Override + public final String toString() { + return super.toString() + + "{kafkaManager=" + kafkaManager + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java new file mode 100644 index 0000000..26db5c9 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Set; + +public class KafkaTridentSpoutManager<K, V> implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class); + + // Kafka + private transient KafkaConsumer<K, V> kafkaConsumer; + + // Bookkeeping + private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; + // Declare some KafkaSpoutConfig references for convenience + private Fields fields; + + /** + * Create a KafkaConsumer manager for the trident spout. + * @param kafkaSpoutConfig The consumer config + */ + public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; + this.fields = getFields(); + LOG.debug("Created {}", this.toString()); + } + + KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) { + kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps()); + + kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context); + return kafkaConsumer; + } + + KafkaConsumer<K, V> getKafkaConsumer() { + return kafkaConsumer; + } + + Set<TopicPartition> getTopicPartitions() { + return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions(); + } + + final Fields getFields() { + if (fields == null) { + RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator(); + Fields fs = null; + for (String stream : translator.streams()) { + if (fs == null) { + fs = translator.getFieldsFor(stream); + } else { + if (!fs.equals(translator.getFieldsFor(stream))) { + throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields"); + } + } + } + fields = fs; + } + LOG.debug("OutputFields = {}", fields); + return fields; + } + + KafkaSpoutConfig<K, V> getKafkaSpoutConfig() { + return kafkaSpoutConfig; + } + + @Override + public final String toString() { + return super.toString() + + "{kafkaConsumer=" + kafkaConsumer + + ", kafkaSpoutConfig=" + kafkaSpoutConfig + + '}'; + } + + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions); + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions); + LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java new file mode 100644 index 0000000..ecc9219 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.util.List; +import java.util.Map; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>, + KafkaTridentSpoutTopicPartition, Map<String, Object>> { + private static final long serialVersionUID = -8003272486566259640L; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class); + + private final KafkaTridentSpoutManager<K, V> kafkaManager; + + public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) { + this(new KafkaTridentSpoutManager<>(conf)); + } + + public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) { + this.kafkaManager = kafkaManager; + LOG.debug("Created {}", this.toString()); + } + + @Override + public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter( + Map conf, TopologyContext context) { + return new KafkaTridentSpoutEmitter<>(kafkaManager, context); + } + + @Override + public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) { + return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + final Fields outputFields = kafkaManager.getFields(); + LOG.debug("OutputFields = {}", outputFields); + return outputFields; + } + + @Override + public final String toString() { + return super.toString() + + "{kafkaManager=" + kafkaManager + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java new file mode 100644 index 0000000..449e24b --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>, + Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class); + + private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); + private final KafkaTridentSpoutManager<K,V> kafkaManager; + + public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) { + this.kafkaManager = kafkaManager; + LOG.debug("Created {}", this.toString()); + } + + @Override + public boolean isReady(long txid) { + LOG.debug("isReady = true"); + return true; // the "old" trident kafka spout always returns true, like this + } + + @Override + public List<Map<String, Object>> getPartitionsForBatch() { + final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions()); + LOG.debug("TopicPartitions for batch {}", topicPartitions); + List<Map<String, Object>> tps = new ArrayList<>(); + for(TopicPartition tp : topicPartitions) { + tps.add(tpSerializer.toMap(tp)); + } + return tps; + } + + @Override + public void close() { + LOG.debug("Closed"); // the "old" trident kafka spout is no op like this + } + + @Override + public final String toString() { + return super.toString() + + "{kafkaManager=" + kafkaManager + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java new file mode 100644 index 0000000..b020bea --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.trident.spout.ISpoutPartition; + +import java.io.Serializable; + +/** + * {@link ISpoutPartition} that wraps {@link TopicPartition} information + */ +public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Serializable { + private TopicPartition topicPartition; + + public KafkaTridentSpoutTopicPartition(String topic, int partition) { + this(new TopicPartition(topic, partition)); + } + + public KafkaTridentSpoutTopicPartition(TopicPartition topicPartition) { + this.topicPartition = topicPartition; + } + + public TopicPartition getTopicPartition() { + return topicPartition; + } + + @Override + public String getId() { + return topicPartition.topic() + "@" + topicPartition.partition(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KafkaTridentSpoutTopicPartition that = (KafkaTridentSpoutTopicPartition) o; + + return topicPartition != null ? topicPartition.equals(that.topicPartition) : that.topicPartition == null; + } + + @Override + public int hashCode() { + return topicPartition != null ? topicPartition.hashCode() : 0; + } + + @Override + public String toString() { + return topicPartition.toString(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java new file mode 100644 index 0000000..2d50ca7 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Set; + +public enum KafkaTridentSpoutTopicPartitionRegistry { + INSTANCE; + + private Set<TopicPartition> topicPartitions; + + KafkaTridentSpoutTopicPartitionRegistry() { + this.topicPartitions = new LinkedHashSet<>(); + } + + public Set<TopicPartition> getTopicPartitions() { + return Collections.unmodifiableSet(topicPartitions); + } + + public void addAll(Collection<? extends TopicPartition> topicPartitions) { + this.topicPartitions.addAll(topicPartitions); + } + + public void removeAll(Collection<? extends TopicPartition> topicPartitions) { + this.topicPartitions.removeAll(topicPartitions); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java new file mode 100644 index 0000000..6e1c587 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.spout.IPartitionedTridentSpout; +import org.apache.storm.tuple.Fields; + +import java.util.Map; + +// TODO +public class KafkaTridentSpoutTransactional implements IPartitionedTridentSpout { + @Override + public Coordinator getCoordinator(Map conf, TopologyContext context) { + return null; + } + + @Override + public Emitter getEmitter(Map conf, TopologyContext context) { + return null; + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java new file mode 100644 index 0000000..50e78f0 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.trident; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; + +public class TopicPartitionSerializer { + + public static final String TOPIC_PARTITION_TOPIC_KEY = "topic"; + public static final String TOPIC_PARTITION_PARTITION_KEY = "partition"; + + /** + * Serializes the given TopicPartition to Map so Trident can serialize it to JSON. + */ + public Map<String, Object> toMap(TopicPartition topicPartition) { + Map<String, Object> topicPartitionMap = new HashMap<>(); + topicPartitionMap.put(TOPIC_PARTITION_TOPIC_KEY, topicPartition.topic()); + topicPartitionMap.put(TOPIC_PARTITION_PARTITION_KEY, topicPartition.partition()); + return topicPartitionMap; + } + + /** + * Deserializes the given map into a TopicPartition. The map keys are expected to be those produced by + * {@link #toMap(org.apache.kafka.common.TopicPartition)}. + */ + public TopicPartition fromMap(Map<String, Object> map) { + return new TopicPartition((String) map.get(TOPIC_PARTITION_TOPIC_KEY), + ((Number) map.get(TOPIC_PARTITION_PARTITION_KEY)).intValue()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java new file mode 100644 index 0000000..2e2c13b --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class TridentKafkaState<K, V> implements State { + private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class); + + private KafkaProducer<K, V> producer; + + private TridentTupleToKafkaMapper<K, V> mapper; + private KafkaTopicSelector topicSelector; + + public TridentKafkaState<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) { + this.mapper = mapper; + return this; + } + + public TridentKafkaState<K, V> withKafkaTopicSelector(KafkaTopicSelector selector) { + this.topicSelector = selector; + return this; + } + + @Override + public void beginCommit(Long txid) { + LOG.debug("beginCommit is Noop."); + } + + @Override + public void commit(Long txid) { + LOG.debug("commit is Noop."); + } + + /** + * Prepare this State. + * @param options The KafkaProducer config. + */ + public void prepare(Properties options) { + Objects.requireNonNull(mapper, "mapper can not be null"); + Objects.requireNonNull(topicSelector, "topicSelector can not be null"); + producer = new KafkaProducer<>(options); + } + + /** + * Write the given tuples to Kafka. + * @param tuples The tuples to write. + * @param collector The Trident collector. + */ + public void updateState(List<TridentTuple> tuples, TridentCollector collector) { + String topic = null; + try { + long startTime = System.currentTimeMillis(); + int numberOfRecords = tuples.size(); + List<Future<RecordMetadata>> futures = new ArrayList<>(numberOfRecords); + for (TridentTuple tuple : tuples) { + topic = topicSelector.getTopic(tuple); + V messageFromTuple = mapper.getMessageFromTuple(tuple); + K keyFromTuple = mapper.getKeyFromTuple(tuple); + + if (topic != null) { + if (messageFromTuple != null) { + Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, keyFromTuple, messageFromTuple)); + futures.add(result); + } else { + LOG.warn("skipping Message with Key {} as message was null", keyFromTuple); + } + + } else { + LOG.warn("skipping key = {}, topic selector returned null.", keyFromTuple); + } + } + + int emittedRecords = futures.size(); + List<ExecutionException> exceptions = new ArrayList<>(emittedRecords); + for (Future<RecordMetadata> future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + exceptions.add(e); + } + } + + if (exceptions.size() > 0) { + StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages "); + errorMsg.append(tuples).append(" from topic = ").append(topic) + .append(" because of the following exceptions:").append(System.lineSeparator()); + + for (ExecutionException exception : exceptions) { + errorMsg = errorMsg.append(exception.getMessage()).append(System.lineSeparator()); + } + String message = errorMsg.toString(); + LOG.error(message); + throw new FailedException(message); + } + long latestTime = System.currentTimeMillis(); + LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", emittedRecords, latestTime - startTime, topic); + + } catch (Exception ex) { + String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; + LOG.warn(errorMsg, ex); + throw new FailedException(errorMsg, ex); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java new file mode 100644 index 0000000..35620de --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident; + +import org.apache.storm.task.IMetricsContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; +import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; + +import java.util.Map; +import java.util.Properties; + +public class TridentKafkaStateFactory<K, V> implements StateFactory { + + private static final long serialVersionUID = -3613240970062343385L; + private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class); + + private TridentTupleToKafkaMapper<K, V> mapper; + private KafkaTopicSelector topicSelector; + private Properties producerProperties = new Properties(); + + public TridentKafkaStateFactory<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) { + this.mapper = mapper; + return this; + } + + public TridentKafkaStateFactory<K, V> withKafkaTopicSelector(KafkaTopicSelector selector) { + this.topicSelector = selector; + return this; + } + + public TridentKafkaStateFactory<K, V> withProducerProperties(Properties props) { + this.producerProperties = props; + return this; + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions); + TridentKafkaState<K, V> state = new TridentKafkaState<>(); + state.withKafkaTopicSelector(this.topicSelector) + .withTridentTupleToKafkaMapper(this.mapper); + state.prepare(producerProperties); + return state; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java new file mode 100644 index 0000000..19e3d33 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.trident; + +import java.util.List; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; + +public class TridentKafkaStateUpdater<K, V> extends BaseStateUpdater<TridentKafkaState<K, V>> { + + private static final long serialVersionUID = 3352659585225274402L; + + @Override + public void updateState(TridentKafkaState<K, V> state, List<TridentTuple> tuples, TridentCollector collector) { + state.updateState(tuples, collector); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java new file mode 100644 index 0000000..2d04971 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.mapper; + +import org.apache.storm.trident.tuple.TridentTuple; + +public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper { + + public final String keyFieldName; + public final String msgFieldName; + + public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) { + this.keyFieldName = keyFieldName; + this.msgFieldName = msgFieldName; + } + + @Override + public K getKeyFromTuple(TridentTuple tuple) { + return (K) tuple.getValueByField(keyFieldName); + } + + @Override + public V getMessageFromTuple(TridentTuple tuple) { + return (V) tuple.getValueByField(msgFieldName); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java new file mode 100644 index 0000000..28c6c89 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.mapper; + +import org.apache.storm.tuple.Tuple; +import org.apache.storm.trident.tuple.TridentTuple; + +import java.io.Serializable; + +public interface TridentTupleToKafkaMapper<K,V> extends Serializable { + K getKeyFromTuple(TridentTuple tuple); + V getMessageFromTuple(TridentTuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java new file mode 100644 index 0000000..607c996 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.selector; + +import org.apache.storm.trident.tuple.TridentTuple; + +public class DefaultTopicSelector implements KafkaTopicSelector { + private static final long serialVersionUID = -1172454882072591493L; + private final String topicName; + + public DefaultTopicSelector(final String topicName) { + this.topicName = topicName; + } + + @Override + public String getTopic(TridentTuple tuple) { + return topicName; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java new file mode 100644 index 0000000..012a6c7 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.trident.selector; + +import org.apache.storm.trident.tuple.TridentTuple; + +import java.io.Serializable; + +public interface KafkaTopicSelector extends Serializable { + String getTopic(TridentTuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java new file mode 100644 index 0000000..93b1040 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class KafkaUnit { + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkUtils zkUtils; + private KafkaProducer<String, String> producer; + private static final String ZK_HOST = "127.0.0.1"; + private static final String KAFKA_HOST = "127.0.0.1"; + private static final int KAFKA_PORT = 9092; + + public KafkaUnit() { + } + + public void setUp() throws IOException { + // setup ZK + zkServer = new EmbeddedZookeeper(); + String zkConnect = ZK_HOST + ":" + zkServer.port(); + ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT)); + KafkaConfig config = new KafkaConfig(brokerProps); + MockTime mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // setup default Producer + createProducer(); + } + + public void tearDown() { + closeProducer(); + kafkaServer.shutdown(); + zkUtils.close(); + zkServer.shutdown(); + } + + public void createTopic(String topicName) { + AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + + public int getKafkaPort() { + return KAFKA_PORT; + } + + private void createProducer() { + Properties producerProps = new Properties(); + producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(producerProps); + } + + public void createProducer(Serializer keySerializer, Serializer valueSerializer) { + Properties producerProps = new Properties(); + producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer); + } + + public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException { + producer.send(producerRecord).get(10, TimeUnit.SECONDS); + } + + private void closeProducer() { + producer.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java new file mode 100644 index 0000000..6e90c9d --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.junit.rules.ExternalResource; + +import java.io.IOException; + + +public class KafkaUnitRule extends ExternalResource { + + private final KafkaUnit kafkaUnit; + + public KafkaUnitRule() { + this.kafkaUnit = new KafkaUnit(); + } + + @Override + public void before() throws IOException { + kafkaUnit.setUp(); + } + + @Override + public void after() { + kafkaUnit.tearDown(); + } + + public KafkaUnit getKafkaUnit() { + return this.kafkaUnit; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java new file mode 100644 index 0000000..8c8a945 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.Testing; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.testing.MkTupleParam; +import org.apache.storm.tuple.Tuple; +import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaBoltTest { + private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class); + + @SuppressWarnings({ "unchecked", "serial" }) + @Test + public void testSimple() { + final KafkaProducer<String, String> producer = mock(KafkaProducer.class); + when(producer.send((ProducerRecord<String,String>)any(), (Callback)any())).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Callback c = (Callback)invocation.getArguments()[1]; + c.onCompletion(null, null); + return null; + } + }); + KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() { + @Override + protected KafkaProducer<String, String> mkProducer(Properties props) { + return producer; + } + }; + bolt.withTopicSelector("MY_TOPIC"); + + OutputCollector collector = mock(OutputCollector.class); + TopologyContext context = mock(TopologyContext.class); + Map<String, Object> conf = new HashMap<>(); + bolt.prepare(conf, context, collector); + MkTupleParam param = new MkTupleParam(); + param.setFields("key", "message"); + Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param); + bolt.execute(testTuple); + verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>() { + @Override + public boolean matches(Object argument) { + LOG.info("GOT {} ->", argument); + ProducerRecord<String, String> arg = (ProducerRecord<String, String>) argument; + LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value()); + return "MY_TOPIC".equals(arg.topic()) && + "KEY".equals(arg.key()) && + "VALUE".equals(arg.value()); + } + }), any(Callback.class)); + verify(collector).ack(testTuple); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java new file mode 100644 index 0000000..abc58f0 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.junit.Test; + +public class ByTopicRecordTranslatorTest { + public static Func<ConsumerRecord<String, String>, List<Object>> JUST_KEY_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> record) { + return new Values(record.key()); + } + }; + + public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> record) { + return new Values(record.value()); + } + }; + + public static Func<ConsumerRecord<String, String>, List<Object>> KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() { + @Override + public List<Object> apply(ConsumerRecord<String, String> record) { + return new Values(record.key(), record.value()); + } + }; + + @Test + public void testBasic() { + ByTopicRecordTranslator<String, String> trans = + new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key")); + trans.forTopic("TOPIC 1", JUST_VALUE_FUNC, new Fields("value"), "value-stream"); + trans.forTopic("TOPIC 2", KEY_VALUE_FUNC, new Fields("key", "value"), "key-value-stream"); + HashSet<String> expectedStreams = new HashSet<>(); + expectedStreams.add("default"); + expectedStreams.add("value-stream"); + expectedStreams.add("key-value-stream"); + assertEquals(expectedStreams, new HashSet<>(trans.streams())); + + ConsumerRecord<String, String> cr1 = new ConsumerRecord<>("TOPIC OTHER", 100, 100, "THE KEY", "THE VALUE"); + assertEquals(new Fields("key"), trans.getFieldsFor("default")); + assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1)); + + ConsumerRecord<String, String> cr2 = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE"); + assertEquals(new Fields("value"), trans.getFieldsFor("value-stream")); + assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2)); + + ConsumerRecord<String, String> cr3 = new ConsumerRecord<>("TOPIC 2", 100, 100, "THE KEY", "THE VALUE"); + assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream")); + assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3)); + } + + @Test(expected = IllegalArgumentException.class) + public void testFieldCollision() { + ByTopicRecordTranslator<String, String> trans = + new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key")); + trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value")); + } + + @Test(expected = IllegalStateException.class) + public void testTopicCollision() { + ByTopicRecordTranslator<String, String> trans = + new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key")); + trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"), "foo1"); + trans.forTopic("foo", KEY_VALUE_FUNC, new Fields("key", "value"), "foo2"); + } + +}