Repository: storm Updated Branches: refs/heads/1.x-branch 555146fc6 -> d7d7e61a1
Merge branch 'Apache_master_STORM-2097_TridentLogs' of https://github.com/hmcl/storm-apache into STORM-2097 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91903743 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91903743 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91903743 Branch: refs/heads/1.x-branch Commit: 91903743d3c3acb2ac5e4949629f2e1b52cc1ed9 Parents: 555146f Author: Sriharsha Chintalapani <har...@hortonworks.com> Authored: Tue Oct 11 08:45:35 2016 -0700 Committer: Sriharsha Chintalapani <har...@hortonworks.com> Committed: Tue Oct 11 09:06:29 2016 -0700 ---------------------------------------------------------------------- .../starter/spout/RandomSentenceSpout.java | 39 ++++++++++- .../starter/trident/DebugMemoryMapState.java | 73 ++++++++++++++++++++ .../starter/trident/TridentKafkaWordCount.java | 58 +++++++++++----- .../jvm/org/apache/storm/kafka/KafkaUtils.java | 9 ++- .../trident/TransactionalTridentKafkaSpout.java | 8 +-- .../kafka/trident/TridentKafkaEmitter.java | 30 ++++++-- .../storm/trident/operation/builtin/Debug.java | 18 ++++- .../spout/IOpaquePartitionedTridentSpout.java | 11 +-- .../trident/spout/IPartitionedTridentSpout.java | 10 +-- .../OpaquePartitionedTridentSpoutExecutor.java | 62 +++++++++++++---- .../spout/PartitionedTridentSpoutExecutor.java | 24 +++++-- .../trident/state/CombinerValueUpdater.java | 9 +++ .../topology/MasterBatchCoordinator.java | 39 +++++++++-- .../state/RotatingTransactionalState.java | 59 +++++++++++++--- .../topology/state/TransactionalState.java | 44 ++++++++---- 15 files changed, 399 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java index 49bec2e..0fe28e2 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java @@ -24,11 +24,17 @@ import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); + SpoutOutputCollector _collector; Random _rand; @@ -42,12 +48,19 @@ public class RandomSentenceSpout extends BaseRichSpout { @Override public void nextTuple() { Utils.sleep(100); - String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", - "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; - String sentence = sentences[_rand.nextInt(sentences.length)]; + String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), + sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; + final String sentence = sentences[_rand.nextInt(sentences.length)]; + + LOG.debug("Emitting tuple: {}", sentence); + _collector.emit(new Values(sentence)); } + protected String sentence(String input) { + return input; + } + @Override public void ack(Object id) { } @@ -61,4 +74,24 @@ public class RandomSentenceSpout extends BaseRichSpout { declarer.declare(new Fields("word")); } + // Add unique identifier to each tuple, which is helpful for debugging + public static class TimeStamped extends RandomSentenceSpout { + private final String prefix; + + public TimeStamped() { + this(""); + } + + public TimeStamped(String prefix) { + this.prefix = prefix; + } + + protected String sentence(String input) { + return prefix + currentDate() + " " + input; + } + + private String currentDate() { + return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java new file mode 100644 index 0000000..3870afa --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.starter.trident; + +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.topology.FailedException; +import org.apache.storm.trident.state.CombinerValueUpdater; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; +import org.apache.storm.trident.state.ValueUpdater; +import org.apache.storm.trident.testing.MemoryMapState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class DebugMemoryMapState<T> extends MemoryMapState<T> { + private static final Logger LOG = LoggerFactory.getLogger(DebugMemoryMapState.class); + + private int updateCount = 0; + + public DebugMemoryMapState(String id) { + super(id); + } + + public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) { + print(keys, updaters); + if ((updateCount++ % 5) == 0) { + LOG.error("Throwing FailedException"); + throw new FailedException("Enforced State Update Fail. On retrial should replay the exact same batch."); + } + return super.multiUpdate(keys, updaters); + } + + private void print(List<List<Object>> keys, List<ValueUpdater> updaters) { + for (int i = 0; i < keys.size(); i++) { + ValueUpdater valueUpdater = updaters.get(i); + Object arg = ((CombinerValueUpdater) valueUpdater).getArg(); + LOG.debug("updateCount = {}, keys = {} => updaterArgs = {}", updateCount, keys.get(i), arg); + } + } + + public static class Factory implements StateFactory { + String _id; + + public Factory() { + _id = UUID.randomUUID().toString(); + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + return new DebugMemoryMapState(_id + partitionIndex); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java index 0499041..ee8d001 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java @@ -23,32 +23,37 @@ package org.apache.storm.starter.trident; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.LocalDRPC; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; -import org.apache.storm.spout.SchemeAsMultiScheme; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.storm.kafka.StringScheme; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.kafka.bolt.KafkaBolt; import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; +import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout; import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout; import org.apache.storm.kafka.trident.TridentKafkaConfig; +import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.starter.spout.RandomSentenceSpout; +import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.builtin.Count; +import org.apache.storm.trident.operation.builtin.Debug; import org.apache.storm.trident.operation.builtin.FilterNull; import org.apache.storm.trident.operation.builtin.MapGet; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.testing.Split; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.Properties; /** @@ -72,7 +77,8 @@ import java.util.Properties; * <a href="https://github.com/apache/storm/tree/master/external/storm-kafka"> Storm Kafka </a>. * </p> */ -public class TridentKafkaWordCount { +public class TridentKafkaWordCount implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaWordCount.class); private String zkUrl; private String brokerUrl; @@ -91,7 +97,7 @@ public class TridentKafkaWordCount { * * @return a transactional trident kafka spout. */ - private TransactionalTridentKafkaSpout createKafkaSpout() { + private TransactionalTridentKafkaSpout createTransactionalKafkaSpout() { ZkHosts hosts = new ZkHosts(zkUrl); TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test"); config.scheme = new SchemeAsMultiScheme(new StringScheme()); @@ -101,6 +107,16 @@ public class TridentKafkaWordCount { return new TransactionalTridentKafkaSpout(config); } + private OpaqueTridentKafkaSpout createOpaqueKafkaSpout() { + ZkHosts hosts = new ZkHosts(zkUrl); + TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test"); + config.scheme = new SchemeAsMultiScheme(new StringScheme()); + + // Consume new data from the topic + config.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); + return new OpaqueTridentKafkaSpout(config); + } + private Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) { return tridentTopology.newDRPCStream("words", drpc) @@ -111,12 +127,14 @@ public class TridentKafkaWordCount { .project(new Fields("word", "count")); } - private TridentState addTridentState(TridentTopology tridentTopology) { - return tridentTopology.newStream("spout1", createKafkaSpout()).parallelismHint(1) + protected TridentState addTridentState(TridentTopology tridentTopology) { +// final Stream spoutStream = tridentTopology.newStream("spout1", createTransactionalKafkaSpout()).parallelismHint(1); + final Stream spoutStream = tridentTopology.newStream("spout1", createOpaqueKafkaSpout()).parallelismHint(1); + + return spoutStream.each(spoutStream.getOutputFields(), new Debug(true)) .each(new Fields("str"), new Split(), new Fields("word")) .groupBy(new Fields("word")) - .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) - .parallelismHint(1); + .persistentAggregate(new DebugMemoryMapState.Factory(), new Count(), new Fields("count")); } /** @@ -140,6 +158,7 @@ public class TridentKafkaWordCount { public Config getConsumerConfig() { Config conf = new Config(); conf.setMaxSpoutPending(20); + conf.setMaxTaskParallelism(1); // conf.setDebug(true); return conf; } @@ -152,7 +171,7 @@ public class TridentKafkaWordCount { */ public StormTopology buildProducerTopology(Properties prop) { TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentenceSpout(), 2); + builder.setSpout("spout", new RandomSentenceSpout.TimeStamped(""), 2); /** * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField * so that this gets written out as the message in the kafka topic. @@ -203,7 +222,6 @@ public class TridentKafkaWordCount { * (word counts) by running an external drpc query against the drpc server. */ public static void main(String[] args) throws Exception { - String zkUrl = "localhost:2181"; // the defaults. String brokerUrl = "localhost:9092"; @@ -213,20 +231,22 @@ public class TridentKafkaWordCount { System.exit(1); } else if (args.length == 1) { zkUrl = args[0]; - } else { + } else if (args.length == 2) { zkUrl = args[0]; brokerUrl = args[1]; } System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl); - TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, brokerUrl); + runMain(args, new TridentKafkaWordCount(zkUrl, brokerUrl)); + } + protected static void runMain(String[] args, TridentKafkaWordCount wordCount) throws Exception { if (args.length == 3) { Config conf = new Config(); conf.setMaxSpoutPending(20); conf.setNumWorkers(1); - // submit the consumer topology. + // submit the CONSUMER topology. StormSubmitter.submitTopology(args[2] + "-consumer", conf, wordCount.buildConsumerTopology(null)); // submit the producer topology. StormSubmitter.submitTopology(args[2] + "-producer", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); @@ -234,17 +254,19 @@ public class TridentKafkaWordCount { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); - // submit the consumer topology. + // submit the CONSUMER topology. cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc)); + // submit the PRODUCER topology. Config conf = new Config(); conf.setMaxSpoutPending(20); - // submit the producer topology. +// conf.setDebug(true); cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig())); // keep querying the word counts for a minute. for (int i = 0; i < 60; i++) { - System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); + LOG.info("--- DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped")); + System.out.println(); Thread.sleep(1000); } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 822912b..8c22118 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -32,7 +32,13 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; @@ -215,6 +221,7 @@ public class KafkaUtils { } else { msgs = fetchResponse.messageSet(topic, partitionId); } + LOG.debug("Messages fetched. [config = {}], [consumer = {}], [partition = {}], [offset = {}], [msgs = {}]", config, consumer, partition, offset, msgs); return msgs; } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java index ac5b49f..7ce8d52 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java @@ -17,13 +17,12 @@ */ package org.apache.storm.kafka.trident; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Fields; import org.apache.storm.kafka.Partition; +import org.apache.storm.task.TopologyContext; import org.apache.storm.trident.spout.IPartitionedTridentSpout; +import org.apache.storm.tuple.Fields; import java.util.Map; -import java.util.UUID; public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> { @@ -42,8 +41,7 @@ public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout< @Override public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) { - return new TridentKafkaEmitter(conf, context, _config, context - .getStormId()).asTransactionalEmitter(); + return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asTransactionalEmitter(); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java index 809ed73..136eb0b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java @@ -20,7 +20,13 @@ package org.apache.storm.kafka.trident; import com.google.common.collect.ImmutableMap; import org.apache.storm.Config; -import org.apache.storm.kafka.*; +import org.apache.storm.kafka.DynamicPartitionConnections; +import org.apache.storm.kafka.FailedFetchException; +import org.apache.storm.kafka.KafkaUtils; +import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme; +import org.apache.storm.kafka.Partition; +import org.apache.storm.kafka.PartitionManager; +import org.apache.storm.kafka.TopicOffsetOutOfRangeException; import org.apache.storm.metric.api.CombinedMetric; import org.apache.storm.metric.api.MeanReducer; import org.apache.storm.metric.api.ReducedMetric; @@ -32,7 +38,11 @@ import org.apache.storm.trident.topology.TransactionAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; @@ -65,7 +75,7 @@ public class TridentKafkaEmitter { private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) { SimpleConsumer consumer = _connections.register(partition); - Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta); + Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt); Long offset = (Long) ret.get("offset"); Long endOffset = (Long) ret.get("nextOffset"); _kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset)); @@ -92,7 +102,8 @@ public class TridentKafkaEmitter { } } - private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) { + private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta, TransactionAttempt attempt) { + LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", attempt, lastMeta); long offset; if (lastMeta != null) { String lastInstanceId = null; @@ -108,6 +119,7 @@ public class TridentKafkaEmitter { } else { offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config); } + LOG.debug("[transaction = {}], [OFFSET = {}]", attempt, offset); ByteBufferMessageSet msgs = null; try { @@ -121,7 +133,7 @@ public class TridentKafkaEmitter { long endoffset = offset; for (MessageAndOffset msg : msgs) { - emit(collector, msg.message(), partition, msg.offset()); + emit(collector, msg.message(), partition, msg.offset(), attempt); endoffset = msg.nextOffset(); } Map newMeta = new HashMap(); @@ -132,6 +144,7 @@ public class TridentKafkaEmitter { newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port)); newMeta.put("topic", partition.topic); newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)); + LOG.debug("[transaction = {}], [newMeta = {}]", attempt, newMeta); return newMeta; } @@ -172,14 +185,14 @@ public class TridentKafkaEmitter { if (offset > nextOffset) { throw new RuntimeException("Error when re-emitting batch. overshot the end offset"); } - emit(collector, msg.message(), partition, msg.offset()); + emit(collector, msg.message(), partition, msg.offset(), attempt); offset = msg.nextOffset(); } } } } - private void emit(TridentCollector collector, Message msg, Partition partition, long offset) { + private void emit(TridentCollector collector, Message msg, Partition partition, long offset, TransactionAttempt attempt) { Iterable<List<Object>> values; if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) { values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset); @@ -189,8 +202,11 @@ public class TridentKafkaEmitter { if (values != null) { for (List<Object> value : values) { + LOG.debug("Emitting: [Transaction: {}], [Data: {}]", attempt, value); collector.emit(value); } + } else { + LOG.debug("NOT Emitting NULL data. [Transaction: {}]", attempt); } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java index 87c4167..a9f30d0 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java +++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java @@ -19,6 +19,8 @@ package org.apache.storm.trident.operation.builtin; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Date; @@ -26,10 +28,18 @@ import java.util.Date; * Filter for debugging purposes. The `isKeep()` method simply prints the tuple to `System.out` and returns `true`. */ public class Debug extends BaseFilter { + private final Logger LOG = LoggerFactory.getLogger(Debug.class); + private final String name; + private boolean useLogger; public Debug() { - name = "DEBUG: "; + this(false); + } + + public Debug(boolean useLogger) { + this.useLogger = useLogger; + this.name = "DEBUG: "; } /** @@ -42,7 +52,11 @@ public class Debug extends BaseFilter { @Override public boolean isKeep(TridentTuple tuple) { - System.out.println("<"+new Date()+"> "+name + tuple.toString()); + if(useLogger) { + LOG.debug(tuple.toString()); + } else { + System.out.println("<"+new Date()+"> "+name + tuple.toString()); + } return true; } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java index bf4c093..fc4d5ea 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java @@ -18,12 +18,12 @@ package org.apache.storm.trident.spout; import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.topology.TransactionAttempt; import org.apache.storm.tuple.Fields; -import java.io.Serializable; + import java.util.List; import java.util.Map; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.topology.TransactionAttempt; /** * This defines a transactional spout which does *not* necessarily @@ -32,13 +32,14 @@ import org.apache.storm.trident.topology.TransactionAttempt; */ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M> extends ITridentDataSource { - public interface Coordinator<Partitions> { + + interface Coordinator<Partitions> { boolean isReady(long txid); Partitions getPartitionsForBatch(); void close(); } - public interface Emitter<Partitions, Partition extends ISpoutPartition, M> { + interface Emitter<Partitions, Partition extends ISpoutPartition, M> { /** * Emit a batch of tuples for a partition/transaction. * http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java b/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java index 7450a3c..82c4f74 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java @@ -18,12 +18,12 @@ package org.apache.storm.trident.spout; import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.topology.TransactionAttempt; import org.apache.storm.tuple.Fields; -import java.io.Serializable; + import java.util.List; import java.util.Map; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.topology.TransactionAttempt; /** * This interface defines a transactional spout that reads its tuples from a partitioned set of @@ -31,7 +31,7 @@ import org.apache.storm.trident.topology.TransactionAttempt; * is always emitted for the same transaction id. The partition metadata is stored in Zookeeper. */ public interface IPartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, T> extends ITridentDataSource { - public interface Coordinator<Partitions> { + interface Coordinator<Partitions> { /** * Return the partitions currently in the source of data. The idea is * is that if a new partition is added and a prior transaction is replayed, it doesn't @@ -45,7 +45,7 @@ public interface IPartitionedTridentSpout<Partitions, Partition extends ISpoutPa void close(); } - public interface Emitter<Partitions, Partition extends ISpoutPartition, X> { + interface Emitter<Partitions, Partition extends ISpoutPartition, X> { List<Partition> getOrderedPartitions(Partitions allPartitionInfo); http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java index 7c43961..ea66acd 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java @@ -19,7 +19,14 @@ package org.apache.storm.trident.spout; import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.topology.TransactionAttempt; +import org.apache.storm.trident.topology.state.RotatingTransactionalState; +import org.apache.storm.trident.topology.state.TransactionalState; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -28,13 +35,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.topology.state.RotatingTransactionalState; -import org.apache.storm.trident.topology.state.TransactionalState; -import org.apache.storm.trident.topology.TransactionAttempt; public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout<Object> { + protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class); + IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout; public class Coordinator implements ITridentSpout.BatchCoordinator<Object> { @@ -46,21 +51,28 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS @Override public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) { + LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata); return _coordinator.getPartitionsForBatch(); } + @Override public void close() { + LOG.debug("Closing"); _coordinator.close(); + LOG.debug("Closed"); } @Override public void success(long txid) { + LOG.debug("Success [txid = {}]", txid); } @Override public boolean isReady(long txid) { - return _coordinator.isReady(txid); + boolean ready = _coordinator.isReady(txid); + LOG.debug("[isReady = {}], [txid = {}]", ready, txid); + return ready; } } @@ -81,19 +93,23 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS Map<String, EmitterPartitionState> _partitionStates = new HashMap<>(); int _index; int _numTasks; - + public Emitter(String txStateId, Map conf, TopologyContext context) { _emitter = _spout.getEmitter(conf, context); _index = context.getThisTaskIndex(); _numTasks = context.getComponentTasks(context.getThisComponentId()).size(); - _state = TransactionalState.newUserState(conf, txStateId); + _state = TransactionalState.newUserState(conf, txStateId); + LOG.debug("Created {}", this); } - + Object _savedCoordinatorMeta = null; boolean _changedMeta = false; - + @Override public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) { + LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]", + tx, coordinatorMeta, collector, this); + if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); @@ -128,17 +144,21 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta); metas.put(id, meta); } + LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]", + tx, coordinatorMeta, collector, this); } @Override public void success(TransactionAttempt tx) { for(EmitterPartitionState state: _partitionStates.values()) { state.rotatingState.cleanupBefore(tx.getTransactionId()); - } + } + LOG.debug("Success transaction {}. [{}]", tx, this); } @Override public void commit(TransactionAttempt attempt) { + LOG.debug("Committing transaction {}. [{}]", attempt, this); // this code here handles a case where a previous commit failed, and the partitions // changed since the last commit. This clears out any state for the removed partitions // for this txid. @@ -159,21 +179,37 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS } } _changedMeta = false; - } + } Long txid = attempt.getTransactionId(); Map<String, Object> metas = _cachedMetas.remove(txid); for(Entry<String, Object> entry: metas.entrySet()) { _partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue()); } + LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this); } @Override public void close() { + LOG.debug("Closing"); _emitter.close(); + LOG.debug("Closed"); } - } - + + @Override + public String toString() { + return "Emitter{" + + ", _state=" + _state + + ", _cachedMetas=" + _cachedMetas + + ", _partitionStates=" + _partitionStates + + ", _index=" + _index + + ", _numTasks=" + _numTasks + + ", _savedCoordinatorMeta=" + _savedCoordinatorMeta + + ", _changedMeta=" + _changedMeta + + '}'; + } + } + public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> spout) { _spout = spout; } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java index dc9b3d9..a96dd1a 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java @@ -23,6 +23,8 @@ import org.apache.storm.trident.topology.TransactionAttempt; import org.apache.storm.trident.topology.state.RotatingTransactionalState; import org.apache.storm.trident.topology.state.TransactionalState; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -31,6 +33,8 @@ import java.util.Map; public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> { + private static final Logger LOG = LoggerFactory.getLogger(PartitionedTridentSpoutExecutor.class); + IPartitionedTridentSpout<Object, ISpoutPartition, Object> _spout; public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Object, ISpoutPartition, Object> spout) { @@ -50,6 +54,8 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> { @Override public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) { + LOG.debug("Initialize Transaction. txid = {}, prevMetadata = {}, currMetadata = {}", txid, prevMetadata, currMetadata); + if(currMetadata!=null) { return currMetadata; } else { @@ -60,16 +66,21 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> { @Override public void close() { + LOG.debug("Closing"); _coordinator.close(); + LOG.debug("Closed"); } @Override public void success(long txid) { + LOG.debug("Success transaction id " + txid); } @Override public boolean isReady(long txid) { - return _coordinator.isReady(txid); + boolean ready = _coordinator.isReady(txid); + LOG.debug("isReady = {} ", ready); + return ready; } } @@ -101,8 +112,9 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> { @Override - public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta, - final TridentCollector collector) { + public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta, final TridentCollector collector) { + LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}]", tx, coordinatorMeta, collector); + if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta); _partitionStates.clear(); @@ -133,11 +145,13 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> { if(meta!=null) { _emitter.emitPartitionBatch(tx, collector, partition, meta); } - } + } + LOG.debug("Emitted Batch. [tx = {}], [coordinatorMeta = {}], [collector = {}]", tx, coordinatorMeta, collector); } @Override public void success(TransactionAttempt tx) { + LOG.debug("Success transaction " + tx); for(EmitterPartitionState state: _partitionStates.values()) { state.rotatingState.cleanupBefore(tx.getTransactionId()); } @@ -145,8 +159,10 @@ public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> { @Override public void close() { + LOG.debug("Closing"); _state.close(); _emitter.close(); + LOG.debug("Closed"); } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java b/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java index 82b7360..f43071f 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java +++ b/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java @@ -33,4 +33,13 @@ public class CombinerValueUpdater implements ValueUpdater<Object> { if(stored==null) return arg; else return agg.combine(stored, arg); } + + // helpful for debugging tests + public Object getArg() { + return arg; + } + + public CombinerAggregator getAgg() { + return agg; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java b/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java index 6f59409..c10af43 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java @@ -22,20 +22,20 @@ import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.trident.spout.ITridentSpout; +import org.apache.storm.trident.topology.state.TransactionalState; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.WindowedTimeThrottler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; -import java.util.Random; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.storm.trident.spout.ITridentSpout; -import org.apache.storm.trident.topology.state.TransactionalState; public class MasterBatchCoordinator extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class); @@ -74,6 +74,7 @@ public class MasterBatchCoordinator extends BaseRichSpout { } _managedSpoutIds = spoutIds; _spouts = spouts; + LOG.debug("Created {}", this); } public List<String> getManagedSpoutIds(){ @@ -112,6 +113,7 @@ public class MasterBatchCoordinator extends BaseRichSpout { String txId = _managedSpoutIds.get(i); _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); } + LOG.debug("Opened {}", this); } @Override @@ -119,6 +121,7 @@ public class MasterBatchCoordinator extends BaseRichSpout { for(TransactionalState state: _states) { state.close(); } + LOG.debug("Closed {}", this); } @Override @@ -130,9 +133,11 @@ public class MasterBatchCoordinator extends BaseRichSpout { public void ack(Object msgId) { TransactionAttempt tx = (TransactionAttempt) msgId; TransactionStatus status = _activeTx.get(tx.getTransactionId()); + LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, status, this); if(status!=null && tx.equals(status.attempt)) { if(status.status==AttemptStatus.PROCESSING) { status.status = AttemptStatus.PROCESSED; + LOG.debug("Changed status. [tx_attempt = {}] [tx_status = {}]", tx, status); } else if(status.status==AttemptStatus.COMMITTING) { _activeTx.remove(tx.getTransactionId()); _attemptIds.remove(tx.getTransactionId()); @@ -141,6 +146,7 @@ public class MasterBatchCoordinator extends BaseRichSpout { for(TransactionalState state: _states) { state.setData(CURRENT_TX, _currTransaction); } + LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this); } sync(); } @@ -150,6 +156,7 @@ public class MasterBatchCoordinator extends BaseRichSpout { public void fail(Object msgId) { TransactionAttempt tx = (TransactionAttempt) msgId; TransactionStatus stored = _activeTx.remove(tx.getTransactionId()); + LOG.debug("Fail. [tx_attempt = {}], [tx_status = {}], [{}]", tx, stored, this); if(stored!=null && tx.equals(stored.attempt)) { _activeTx.tailMap(tx.getTransactionId()).clear(); sync(); @@ -174,6 +181,7 @@ public class MasterBatchCoordinator extends BaseRichSpout { if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { maybeCommit.status = AttemptStatus.COMMITTING; _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); + LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this); } if(_active) { @@ -196,8 +204,10 @@ public class MasterBatchCoordinator extends BaseRichSpout { } TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); - _activeTx.put(curr, new TransactionStatus(attempt)); + final TransactionStatus newTransactionStatus = new TransactionStatus(attempt); + _activeTx.put(curr, newTransactionStatus); _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); + LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this); _throttler.markEvent(); } curr = nextTransactionId(curr); @@ -286,4 +296,21 @@ public class MasterBatchCoordinator extends BaseRichSpout { ret.tailMap(currTransaction + maxBatches - 1).clear(); return ret; } + + @Override + public String toString() { + return "MasterBatchCoordinator{" + + "_states=" + _states + + ", _activeTx=" + _activeTx + + ", _attemptIds=" + _attemptIds + + ", _collector=" + _collector + + ", _currTransaction=" + _currTransaction + + ", _maxTransactionActive=" + _maxTransactionActive + + ", _coordinators=" + _coordinators + + ", _managedSpoutIds=" + _managedSpoutIds + + ", _spouts=" + _spouts + + ", _throttler=" + _throttler + + ", _active=" + _active + + "}"; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java index ebd73b2..6078f51 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java @@ -19,6 +19,8 @@ package org.apache.storm.trident.topology.state; import org.apache.storm.utils.Utils; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.List; @@ -26,6 +28,8 @@ import java.util.SortedMap; import java.util.TreeMap; public class RotatingTransactionalState { + private static final Logger LOG = LoggerFactory.getLogger(RotatingTransactionalState.class); + public static interface StateInitializer { Object init(long txid, Object lastState); } @@ -40,6 +44,7 @@ public class RotatingTransactionalState { _subdir = subdir; state.mkdir(subdir); sync(); + LOG.debug("Created {}", this); } @@ -49,19 +54,30 @@ public class RotatingTransactionalState { } public void overrideState(long txid, Object state) { + LOG.debug("Overriding state. [txid = {}], [state = {}]", txid, state); + LOG.trace("[{}]", this); + _state.setData(txPath(txid), state); _curr.put(txid, state); + + LOG.trace("Overriding state complete. [{}]", this); } public void removeState(long txid) { + Object state = null; if(_curr.containsKey(txid)) { - _curr.remove(txid); + state = _curr.remove(txid); _state.delete(txPath(txid)); } + LOG.debug("Removed [state = {}], [txid = {}]", state, txid); + LOG.trace("[{}]", this); } public Object getState(long txid) { - return _curr.get(txid); + final Object state = _curr.get(txid); + LOG.debug("Getting state. [txid = {}] => [state = {}]", txid, state); + LOG.trace("Internal state [{}]", this); + return state; } public Object getState(long txid, StateInitializer init) { @@ -87,13 +103,26 @@ public class RotatingTransactionalState { _curr.put(txid, data); _state.setData(txPath(txid), data); } - return _curr.get(txid); + Object state = _curr.get(txid); + LOG.debug("Getting or initializing state. [txid = {}] => [state = {}]", txid, state); + LOG.trace("[{}]", this); + return state; } public Object getPreviousState(long txid) { - SortedMap<Long, Object> prevMap = _curr.headMap(txid); - if(prevMap.isEmpty()) return null; - else return prevMap.get(prevMap.lastKey()); + final SortedMap<Long, Object> prevMap = _curr.headMap(txid); + Object state; + + if(prevMap.isEmpty()) { + state = null; + } + else { + state = prevMap.get(prevMap.lastKey()); + } + + LOG.debug("Getting previous [state = {}], [txid = {}]", state, txid); + LOG.trace("[{}]", this); + return state; } public boolean hasCache(long txid) { @@ -104,12 +133,14 @@ public class RotatingTransactionalState { * Returns null if it was created, the value otherwise. */ public Object getStateOrCreate(long txid, StateInitializer init) { + Object state; if(_curr.containsKey(txid)) { - return _curr.get(txid); + state = _curr.get(txid); } else { getState(txid, init); - return null; + state = null; } + return state; } public void cleanupBefore(long txid) { @@ -142,6 +173,14 @@ public class RotatingTransactionalState { private String txPath(String tx) { return _subdir + "/" + tx; - } - + } + + @Override + public String toString() { + return "RotatingTransactionalState{" + + "_state=" + _state + + ", _subdir='" + _subdir + '\'' + + ", _curr=" + _curr + + '}'; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java index 3bf2794..af1f297 100644 --- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java +++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java @@ -18,27 +18,28 @@ package org.apache.storm.trident.topology.state; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable; import org.apache.storm.Config; import org.apache.storm.utils.Utils; import org.apache.storm.utils.ZookeeperAuthInfo; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable; -import org.apache.curator.framework.api.PathAndBytesable; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.ACL; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.json.simple.JSONValue; public class TransactionalState { + private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class); + CuratorFramework _curator; List<ACL> _zkAcls = null; @@ -87,6 +88,7 @@ public class TransactionalState { byte[] data, List<ACL> acls, CreateMode mode) throws Exception { ProtectACLCreateModePathAndBytesable<String> builder = curator.create().creatingParentsIfNeeded(); + LOG.debug("Creating node [path = {}], [data = {}], [acls = {}], [mode = {}]", path, asString(data), acls, mode); if (acls == null) { if (mode == null ) { @@ -100,6 +102,10 @@ public class TransactionalState { TransactionalState.forPath(builder.withACL(acls), path, data); } + private static String asString(byte[] data) { + return data == null ? "null" : new String(data); + } + public void setData(String path, Object obj) { path = "/" + path; byte[] ser; @@ -115,6 +121,7 @@ public class TransactionalState { TransactionalState.createNode(_curator, path, ser, _zkAcls, CreateMode.PERSISTENT); } + LOG.debug("Set [path = {}] => [data = {}]", path, asString(ser)); } catch(Exception e) { throw new RuntimeException(e); } @@ -127,19 +134,23 @@ public class TransactionalState { } catch (Exception e) { throw new RuntimeException(e); } + LOG.debug("Deleted [path = {}]", path); } public List<String> list(String path) { path = "/" + path; try { + List<String> children; if(_curator.checkExists().forPath(path)==null) { - return new ArrayList<String>(); + children = new ArrayList<>(); } else { - return _curator.getChildren().forPath(path); + children = _curator.getChildren().forPath(path); } + LOG.debug("List [path = {}], [children = {}]", path, children); + return children; } catch(Exception e) { throw new RuntimeException(e); - } + } } public void mkdir(String path) { @@ -149,11 +160,14 @@ public class TransactionalState { public Object getData(String path) { path = "/" + path; try { + Object data; if(_curator.checkExists().forPath(path)!=null) { - return JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8")); + data = JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8")); } else { - return null; + data = null; } + LOG.debug("Get. [path = {}] => [data = {}]", path, data); + return data; } catch(Exception e) { throw new RuntimeException(e); }