Repository: storm
Updated Branches:
  refs/heads/1.x-branch 555146fc6 -> d7d7e61a1


Merge branch 'Apache_master_STORM-2097_TridentLogs' of 
https://github.com/hmcl/storm-apache into STORM-2097


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91903743
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91903743
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91903743

Branch: refs/heads/1.x-branch
Commit: 91903743d3c3acb2ac5e4949629f2e1b52cc1ed9
Parents: 555146f
Author: Sriharsha Chintalapani <har...@hortonworks.com>
Authored: Tue Oct 11 08:45:35 2016 -0700
Committer: Sriharsha Chintalapani <har...@hortonworks.com>
Committed: Tue Oct 11 09:06:29 2016 -0700

----------------------------------------------------------------------
 .../starter/spout/RandomSentenceSpout.java      | 39 ++++++++++-
 .../starter/trident/DebugMemoryMapState.java    | 73 ++++++++++++++++++++
 .../starter/trident/TridentKafkaWordCount.java  | 58 +++++++++++-----
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  |  9 ++-
 .../trident/TransactionalTridentKafkaSpout.java |  8 +--
 .../kafka/trident/TridentKafkaEmitter.java      | 30 ++++++--
 .../storm/trident/operation/builtin/Debug.java  | 18 ++++-
 .../spout/IOpaquePartitionedTridentSpout.java   | 11 +--
 .../trident/spout/IPartitionedTridentSpout.java | 10 +--
 .../OpaquePartitionedTridentSpoutExecutor.java  | 62 +++++++++++++----
 .../spout/PartitionedTridentSpoutExecutor.java  | 24 +++++--
 .../trident/state/CombinerValueUpdater.java     |  9 +++
 .../topology/MasterBatchCoordinator.java        | 39 +++++++++--
 .../state/RotatingTransactionalState.java       | 59 +++++++++++++---
 .../topology/state/TransactionalState.java      | 44 ++++++++----
 15 files changed, 399 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
index 49bec2e..0fe28e2 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomSentenceSpout.java
@@ -24,11 +24,17 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Map;
 import java.util.Random;
 
 public class RandomSentenceSpout extends BaseRichSpout {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RandomSentenceSpout.class);
+
   SpoutOutputCollector _collector;
   Random _rand;
 
@@ -42,12 +48,19 @@ public class RandomSentenceSpout extends BaseRichSpout {
   @Override
   public void nextTuple() {
     Utils.sleep(100);
-    String[] sentences = new String[]{ "the cow jumped over the moon", "an 
apple a day keeps the doctor away",
-        "four score and seven years ago", "snow white and the seven dwarfs", 
"i am at two with nature" };
-    String sentence = sentences[_rand.nextInt(sentences.length)];
+    String[] sentences = new String[]{sentence("the cow jumped over the 
moon"), sentence("an apple a day keeps the doctor away"),
+            sentence("four score and seven years ago"), sentence("snow white 
and the seven dwarfs"), sentence("i am at two with nature")};
+    final String sentence = sentences[_rand.nextInt(sentences.length)];
+
+    LOG.debug("Emitting tuple: {}", sentence);
+
     _collector.emit(new Values(sentence));
   }
 
+  protected String sentence(String input) {
+    return input;
+  }
+
   @Override
   public void ack(Object id) {
   }
@@ -61,4 +74,24 @@ public class RandomSentenceSpout extends BaseRichSpout {
     declarer.declare(new Fields("word"));
   }
 
+  // Add unique identifier to each tuple, which is helpful for debugging
+  public static class TimeStamped extends RandomSentenceSpout {
+    private final String prefix;
+
+    public TimeStamped() {
+      this("");
+    }
+
+    public TimeStamped(String prefix) {
+      this.prefix = prefix;
+    }
+
+    protected String sentence(String input) {
+      return prefix + currentDate() + " " + input;
+    }
+
+    private String currentDate() {
+      return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new 
Date());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
new file mode 100644
index 0000000..3870afa
--- /dev/null
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/DebugMemoryMapState.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.apache.storm.topology.FailedException;
+import org.apache.storm.trident.state.CombinerValueUpdater;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+import org.apache.storm.trident.state.ValueUpdater;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class DebugMemoryMapState<T> extends MemoryMapState<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebugMemoryMapState.class);
+
+    private int updateCount = 0;
+
+    public DebugMemoryMapState(String id) {
+        super(id);
+    }
+
+    public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> 
updaters) {
+        print(keys, updaters);
+        if ((updateCount++ % 5) == 0) {
+            LOG.error("Throwing FailedException");
+            throw new FailedException("Enforced State Update Fail. On retrial 
should replay the exact same batch.");
+        }
+        return super.multiUpdate(keys, updaters);
+    }
+
+    private void print(List<List<Object>> keys, List<ValueUpdater> updaters) {
+        for (int i = 0; i < keys.size(); i++) {
+            ValueUpdater valueUpdater = updaters.get(i);
+            Object arg = ((CombinerValueUpdater) valueUpdater).getArg();
+            LOG.debug("updateCount = {}, keys = {} => updaterArgs = {}", 
updateCount, keys.get(i), arg);
+        }
+    }
+
+    public static class Factory implements StateFactory {
+        String _id;
+
+        public Factory() {
+            _id = UUID.randomUUID().toString();
+        }
+
+        @Override
+        public State makeState(Map conf, IMetricsContext metrics, int 
partitionIndex, int numPartitions) {
+            return new DebugMemoryMapState(_id + partitionIndex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
index 0499041..ee8d001 100644
--- 
a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
+++ 
b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentKafkaWordCount.java
@@ -23,32 +23,37 @@
 package org.apache.storm.starter.trident;
 
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalDRPC;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.storm.kafka.StringScheme;
 import org.apache.storm.kafka.ZkHosts;
 import org.apache.storm.kafka.bolt.KafkaBolt;
 import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
 import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
 import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
 import org.apache.storm.kafka.trident.TridentKafkaConfig;
+import org.apache.storm.spout.SchemeAsMultiScheme;
 import org.apache.storm.starter.spout.RandomSentenceSpout;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.operation.builtin.Debug;
 import org.apache.storm.trident.operation.builtin.FilterNull;
 import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.testing.Split;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.Properties;
 
 /**
@@ -72,7 +77,8 @@ import java.util.Properties;
  *     <a 
href="https://github.com/apache/storm/tree/master/external/storm-kafka";> Storm 
Kafka </a>.
  * </p>
  */
-public class TridentKafkaWordCount {
+public class TridentKafkaWordCount implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TridentKafkaWordCount.class);
 
     private String zkUrl;
     private String brokerUrl;
@@ -91,7 +97,7 @@ public class TridentKafkaWordCount {
      *
      * @return a transactional trident kafka spout.
      */
-    private TransactionalTridentKafkaSpout createKafkaSpout() {
+    private TransactionalTridentKafkaSpout createTransactionalKafkaSpout() {
         ZkHosts hosts = new ZkHosts(zkUrl);
         TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
         config.scheme = new SchemeAsMultiScheme(new StringScheme());
@@ -101,6 +107,16 @@ public class TridentKafkaWordCount {
         return new TransactionalTridentKafkaSpout(config);
     }
 
+    private OpaqueTridentKafkaSpout createOpaqueKafkaSpout() {
+        ZkHosts hosts = new ZkHosts(zkUrl);
+        TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
+        config.scheme = new SchemeAsMultiScheme(new StringScheme());
+
+        // Consume new data from the topic
+        config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+        return new OpaqueTridentKafkaSpout(config);
+    }
+
 
     private Stream addDRPCStream(TridentTopology tridentTopology, TridentState 
state, LocalDRPC drpc) {
         return tridentTopology.newDRPCStream("words", drpc)
@@ -111,12 +127,14 @@ public class TridentKafkaWordCount {
                 .project(new Fields("word", "count"));
     }
 
-    private TridentState addTridentState(TridentTopology tridentTopology) {
-        return tridentTopology.newStream("spout1", 
createKafkaSpout()).parallelismHint(1)
+    protected TridentState addTridentState(TridentTopology tridentTopology) {
+//        final Stream spoutStream = tridentTopology.newStream("spout1", 
createTransactionalKafkaSpout()).parallelismHint(1);
+        final Stream spoutStream = tridentTopology.newStream("spout1", 
createOpaqueKafkaSpout()).parallelismHint(1);
+
+        return spoutStream.each(spoutStream.getOutputFields(), new Debug(true))
                 .each(new Fields("str"), new Split(), new Fields("word"))
                 .groupBy(new Fields("word"))
-                .persistentAggregate(new MemoryMapState.Factory(), new 
Count(), new Fields("count"))
-                .parallelismHint(1);
+                .persistentAggregate(new DebugMemoryMapState.Factory(), new 
Count(), new Fields("count"));
     }
 
     /**
@@ -140,6 +158,7 @@ public class TridentKafkaWordCount {
     public Config getConsumerConfig() {
         Config conf = new Config();
         conf.setMaxSpoutPending(20);
+        conf.setMaxTaskParallelism(1);
         //  conf.setDebug(true);
         return conf;
     }
@@ -152,7 +171,7 @@ public class TridentKafkaWordCount {
      */
     public StormTopology buildProducerTopology(Properties prop) {
         TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("spout", new RandomSentenceSpout(), 2);
+        builder.setSpout("spout", new RandomSentenceSpout.TimeStamped(""), 2);
         /**
          * The output field of the RandomSentenceSpout ("word") is provided as 
the boltMessageField
          * so that this gets written out as the message in the kafka topic.
@@ -203,7 +222,6 @@ public class TridentKafkaWordCount {
      * (word counts) by running an external drpc query against the drpc server.
      */
     public static void main(String[] args) throws Exception {
-
         String zkUrl = "localhost:2181";        // the defaults.
         String brokerUrl = "localhost:9092";
 
@@ -213,20 +231,22 @@ public class TridentKafkaWordCount {
             System.exit(1);
         } else if (args.length == 1) {
             zkUrl = args[0];
-        } else {
+        } else if (args.length == 2) {
             zkUrl = args[0];
             brokerUrl = args[1];
         }
 
         System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker 
url: " + brokerUrl);
 
-        TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, 
brokerUrl);
+        runMain(args, new TridentKafkaWordCount(zkUrl, brokerUrl));
+    }
 
+    protected static void runMain(String[] args, TridentKafkaWordCount 
wordCount) throws Exception {
         if (args.length == 3)  {
             Config conf = new Config();
             conf.setMaxSpoutPending(20);
             conf.setNumWorkers(1);
-            // submit the consumer topology.
+            // submit the CONSUMER topology.
             StormSubmitter.submitTopology(args[2] + "-consumer", conf, 
wordCount.buildConsumerTopology(null));
             // submit the producer topology.
             StormSubmitter.submitTopology(args[2] + "-producer", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
@@ -234,17 +254,19 @@ public class TridentKafkaWordCount {
             LocalDRPC drpc = new LocalDRPC();
             LocalCluster cluster = new LocalCluster();
 
-            // submit the consumer topology.
+            // submit the CONSUMER topology.
             cluster.submitTopology("wordCounter", 
wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc));
 
+            // submit the PRODUCER topology.
             Config conf = new Config();
             conf.setMaxSpoutPending(20);
-            // submit the producer topology.
+//            conf.setDebug(true);
             cluster.submitTopology("kafkaBolt", conf, 
wordCount.buildProducerTopology(wordCount.getProducerConfig()));
 
             // keep querying the word counts for a minute.
             for (int i = 0; i < 60; i++) {
-                System.out.println("DRPC RESULT: " + drpc.execute("words", 
"the and apple snow jumped"));
+                LOG.info("--- DRPC RESULT: " + drpc.execute("words", "the and 
apple snow jumped"));
+                System.out.println();
                 Thread.sleep(1000);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 822912b..8c22118 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -32,7 +32,13 @@ import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
@@ -215,6 +221,7 @@ public class KafkaUtils {
         } else {
             msgs = fetchResponse.messageSet(topic, partitionId);
         }
+        LOG.debug("Messages fetched. [config = {}], [consumer = {}], 
[partition = {}], [offset = {}], [msgs = {}]", config, consumer, partition, 
offset, msgs);
         return msgs;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index ac5b49f..7ce8d52 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -17,13 +17,12 @@
  */
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.kafka.Partition;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
 
 import java.util.Map;
-import java.util.UUID;
 
 
 public class TransactionalTridentKafkaSpout implements 
IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
@@ -42,8 +41,7 @@ public class TransactionalTridentKafkaSpout implements 
IPartitionedTridentSpout<
 
     @Override
     public IPartitionedTridentSpout.Emitter getEmitter(Map conf, 
TopologyContext context) {
-        return new TridentKafkaEmitter(conf, context, _config, context
-                .getStormId()).asTransactionalEmitter();
+        return new TridentKafkaEmitter(conf, context, _config, 
context.getStormId()).asTransactionalEmitter();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 809ed73..136eb0b 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -20,7 +20,13 @@ package org.apache.storm.kafka.trident;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.storm.Config;
-import org.apache.storm.kafka.*;
+import org.apache.storm.kafka.DynamicPartitionConnections;
+import org.apache.storm.kafka.FailedFetchException;
+import org.apache.storm.kafka.KafkaUtils;
+import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme;
+import org.apache.storm.kafka.Partition;
+import org.apache.storm.kafka.PartitionManager;
+import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
 import org.apache.storm.metric.api.CombinedMetric;
 import org.apache.storm.metric.api.MeanReducer;
 import org.apache.storm.metric.api.ReducedMetric;
@@ -32,7 +38,11 @@ import org.apache.storm.trident.topology.TransactionAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -65,7 +75,7 @@ public class TridentKafkaEmitter {
 
     private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, 
TridentCollector collector, Partition partition, Map lastMeta) {
         SimpleConsumer consumer = _connections.register(partition);
-        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, 
lastMeta);
+        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, 
lastMeta, attempt);
         Long offset = (Long) ret.get("offset");
         Long endOffset = (Long) ret.get("nextOffset");
         _kafkaOffsetMetric.setOffsetData(partition, new 
PartitionManager.OffsetData(endOffset, offset));
@@ -92,7 +102,8 @@ public class TridentKafkaEmitter {
         }
     }
 
-    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition 
partition, TridentCollector collector, Map lastMeta) {
+    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition 
partition, TridentCollector collector, Map lastMeta, TransactionAttempt 
attempt) {
+        LOG.debug("Emitting new partition batch - [transaction = {}], 
[lastMeta = {}]", attempt, lastMeta);
         long offset;
         if (lastMeta != null) {
             String lastInstanceId = null;
@@ -108,6 +119,7 @@ public class TridentKafkaEmitter {
         } else {
             offset = KafkaUtils.getOffset(consumer, partition.topic, 
partition.partition, _config);
         }
+        LOG.debug("[transaction = {}], [OFFSET = {}]", attempt, offset);
 
         ByteBufferMessageSet msgs = null;
         try {
@@ -121,7 +133,7 @@ public class TridentKafkaEmitter {
 
         long endoffset = offset;
         for (MessageAndOffset msg : msgs) {
-            emit(collector, msg.message(), partition, msg.offset());
+            emit(collector, msg.message(), partition, msg.offset(), attempt);
             endoffset = msg.nextOffset();
         }
         Map newMeta = new HashMap();
@@ -132,6 +144,7 @@ public class TridentKafkaEmitter {
         newMeta.put("broker", ImmutableMap.of("host", partition.host.host, 
"port", partition.host.port));
         newMeta.put("topic", partition.topic);
         newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", 
_topologyInstanceId));
+        LOG.debug("[transaction = {}], [newMeta = {}]", attempt, newMeta);
         return newMeta;
     }
 
@@ -172,14 +185,14 @@ public class TridentKafkaEmitter {
                     if (offset > nextOffset) {
                         throw new RuntimeException("Error when re-emitting 
batch. overshot the end offset");
                     }
-                    emit(collector, msg.message(), partition, msg.offset());
+                    emit(collector, msg.message(), partition, msg.offset(), 
attempt);
                     offset = msg.nextOffset();
                 }
             }
         }
     }
 
-    private void emit(TridentCollector collector, Message msg, Partition 
partition, long offset) {
+    private void emit(TridentCollector collector, Message msg, Partition 
partition, long offset, TransactionAttempt attempt) {
         Iterable<List<Object>> values;
         if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
             values = 
KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, 
msg, partition, offset);
@@ -189,8 +202,11 @@ public class TridentKafkaEmitter {
 
         if (values != null) {
             for (List<Object> value : values) {
+                LOG.debug("Emitting: [Transaction: {}], [Data: {}]", attempt, 
value);
                 collector.emit(value);
             }
+        } else {
+            LOG.debug("NOT Emitting NULL data. [Transaction: {}]", attempt);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java 
b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
index 87c4167..a9f30d0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Debug.java
@@ -19,6 +19,8 @@ package org.apache.storm.trident.operation.builtin;
 
 import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 
@@ -26,10 +28,18 @@ import java.util.Date;
  * Filter for debugging purposes. The `isKeep()` method simply prints the 
tuple to `System.out` and returns `true`.
  */
 public class Debug extends BaseFilter {
+    private final Logger LOG = LoggerFactory.getLogger(Debug.class);
+
     private final String name;
+    private boolean useLogger;
 
     public Debug() {
-        name = "DEBUG: ";
+        this(false);
+    }
+
+    public Debug(boolean useLogger) {
+        this.useLogger = useLogger;
+        this.name = "DEBUG: ";
     }
 
     /**
@@ -42,7 +52,11 @@ public class Debug extends BaseFilter {
 
     @Override
     public boolean isKeep(TridentTuple tuple) {
-        System.out.println("<"+new Date()+"> "+name + tuple.toString());
+        if(useLogger) {
+            LOG.debug(tuple.toString());
+        } else {
+            System.out.println("<"+new Date()+"> "+name + tuple.toString());
+        }
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index bf4c093..fc4d5ea 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -18,12 +18,12 @@
 package org.apache.storm.trident.spout;
 
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
 import org.apache.storm.tuple.Fields;
-import java.io.Serializable;
+
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.TransactionAttempt;
 
 /**
  * This defines a transactional spout which does *not* necessarily
@@ -32,13 +32,14 @@ import org.apache.storm.trident.topology.TransactionAttempt;
  */
 public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends 
ISpoutPartition, M>
     extends ITridentDataSource {
-    public interface Coordinator<Partitions> {
+
+    interface Coordinator<Partitions> {
         boolean isReady(long txid);
         Partitions getPartitionsForBatch();
         void close();
     }
     
-    public interface Emitter<Partitions, Partition extends ISpoutPartition, M> 
{
+    interface Emitter<Partitions, Partition extends ISpoutPartition, M> {
         /**
          * Emit a batch of tuples for a partition/transaction. 
          * 

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java
index 7450a3c..82c4f74 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/IPartitionedTridentSpout.java
@@ -18,12 +18,12 @@
 package org.apache.storm.trident.spout;
 
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
 import org.apache.storm.tuple.Fields;
-import java.io.Serializable;
+
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.TransactionAttempt;
 
 /**
  * This interface defines a transactional spout that reads its tuples from a 
partitioned set of 
@@ -31,7 +31,7 @@ import org.apache.storm.trident.topology.TransactionAttempt;
  * is always emitted for the same transaction id. The partition metadata is 
stored in Zookeeper.
  */
 public interface IPartitionedTridentSpout<Partitions, Partition extends 
ISpoutPartition, T> extends ITridentDataSource {
-    public interface Coordinator<Partitions> {
+    interface Coordinator<Partitions> {
         /**
          * Return the partitions currently in the source of data. The idea is
          * is that if a new partition is added and a prior transaction is 
replayed, it doesn't
@@ -45,7 +45,7 @@ public interface IPartitionedTridentSpout<Partitions, 
Partition extends ISpoutPa
         void close();
     }
     
-    public interface Emitter<Partitions, Partition extends ISpoutPartition, X> 
{
+    interface Emitter<Partitions, Partition extends ISpoutPartition, X> {
         
         List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
         

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index 7c43961..ea66acd 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -19,7 +19,14 @@ package org.apache.storm.trident.spout;
 
 
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.apache.storm.trident.topology.state.RotatingTransactionalState;
+import org.apache.storm.trident.topology.state.TransactionalState;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,13 +35,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.state.RotatingTransactionalState;
-import org.apache.storm.trident.topology.state.TransactionalState;
-import org.apache.storm.trident.topology.TransactionAttempt;
 
 
 public class OpaquePartitionedTridentSpoutExecutor implements 
ICommitterTridentSpout<Object> {
+    protected final Logger LOG = 
LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);
+
     IOpaquePartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
     
     public class Coordinator implements ITridentSpout.BatchCoordinator<Object> 
{
@@ -46,21 +51,28 @@ public class OpaquePartitionedTridentSpoutExecutor 
implements ICommitterTridentS
         
         @Override
         public Object initializeTransaction(long txid, Object prevMetadata, 
Object currMetadata) {
+            LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = 
{}], [currMetadata = {}]", txid, prevMetadata, currMetadata);
             return _coordinator.getPartitionsForBatch();
         }
 
+
         @Override
         public void close() {
+            LOG.debug("Closing");
             _coordinator.close();
+            LOG.debug("Closed");
         }
 
         @Override
         public void success(long txid) {
+            LOG.debug("Success [txid = {}]", txid);
         }
 
         @Override
         public boolean isReady(long txid) {
-            return _coordinator.isReady(txid);
+            boolean ready = _coordinator.isReady(txid);
+            LOG.debug("[isReady = {}], [txid = {}]", ready, txid);
+            return ready;
         }
     }
     
@@ -81,19 +93,23 @@ public class OpaquePartitionedTridentSpoutExecutor 
implements ICommitterTridentS
         Map<String, EmitterPartitionState> _partitionStates = new HashMap<>();
         int _index;
         int _numTasks;
-        
+
         public Emitter(String txStateId, Map conf, TopologyContext context) {
             _emitter = _spout.getEmitter(conf, context);
             _index = context.getThisTaskIndex();
             _numTasks = 
context.getComponentTasks(context.getThisComponentId()).size();
-            _state = TransactionalState.newUserState(conf, txStateId);         
    
+            _state = TransactionalState.newUserState(conf, txStateId);
+            LOG.debug("Created {}", this);
         }
-        
+
         Object _savedCoordinatorMeta = null;
         boolean _changedMeta = false;
-        
+
         @Override
         public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, 
TridentCollector collector) {
+            LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = 
{}], [collector = {}], [{}]",
+                    tx, coordinatorMeta, collector, this);
+
             if(_savedCoordinatorMeta==null || 
!_savedCoordinatorMeta.equals(coordinatorMeta)) {
                 List<ISpoutPartition> partitions = 
_emitter.getOrderedPartitions(coordinatorMeta);
                 _partitionStates.clear();
@@ -128,17 +144,21 @@ public class OpaquePartitionedTridentSpoutExecutor 
implements ICommitterTridentS
                 Object meta = _emitter.emitPartitionBatch(tx, collector, 
s.partition, lastMeta);
                 metas.put(id, meta);
             }
+            LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = 
{}], [collector = {}], [{}]",
+                    tx, coordinatorMeta, collector, this);
         }
 
         @Override
         public void success(TransactionAttempt tx) {
             for(EmitterPartitionState state: _partitionStates.values()) {
                 state.rotatingState.cleanupBefore(tx.getTransactionId());
-            }            
+            }
+            LOG.debug("Success transaction {}. [{}]", tx, this);
         }
 
         @Override
         public void commit(TransactionAttempt attempt) {
+            LOG.debug("Committing transaction {}. [{}]", attempt, this);
             // this code here handles a case where a previous commit failed, 
and the partitions
             // changed since the last commit. This clears out any state for 
the removed partitions
             // for this txid.
@@ -159,21 +179,37 @@ public class OpaquePartitionedTridentSpoutExecutor 
implements ICommitterTridentS
                     }
                 }
                 _changedMeta = false;
-            }            
+            }
             
             Long txid = attempt.getTransactionId();
             Map<String, Object> metas = _cachedMetas.remove(txid);
             for(Entry<String, Object> entry: metas.entrySet()) {
                 
_partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, 
entry.getValue());
             }
+            LOG.debug("Exiting commit method for transaction {}. [{}]", 
attempt, this);
         }
 
         @Override
         public void close() {
+            LOG.debug("Closing");
             _emitter.close();
+            LOG.debug("Closed");
         }
-    } 
-    
+
+        @Override
+        public String toString() {
+            return "Emitter{" +
+                    ", _state=" + _state +
+                    ", _cachedMetas=" + _cachedMetas +
+                    ", _partitionStates=" + _partitionStates +
+                    ", _index=" + _index +
+                    ", _numTasks=" + _numTasks +
+                    ", _savedCoordinatorMeta=" + _savedCoordinatorMeta +
+                    ", _changedMeta=" + _changedMeta +
+                    '}';
+        }
+    }
+
     public 
OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout<Object, 
ISpoutPartition, Object> spout) {
         _spout = spout;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
index dc9b3d9..a96dd1a 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -23,6 +23,8 @@ import org.apache.storm.trident.topology.TransactionAttempt;
 import org.apache.storm.trident.topology.state.RotatingTransactionalState;
 import org.apache.storm.trident.topology.state.TransactionalState;
 import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,6 +33,8 @@ import java.util.Map;
 
 
 public class PartitionedTridentSpoutExecutor implements ITridentSpout<Object> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionedTridentSpoutExecutor.class);
+
     IPartitionedTridentSpout<Object, ISpoutPartition, Object> _spout;
     
     public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout<Object, 
ISpoutPartition, Object> spout) {
@@ -50,6 +54,8 @@ public class PartitionedTridentSpoutExecutor implements 
ITridentSpout<Object> {
         
         @Override
         public Object initializeTransaction(long txid, Object prevMetadata, 
Object currMetadata) {
+            LOG.debug("Initialize Transaction. txid = {}, prevMetadata = {}, 
currMetadata = {}", txid, prevMetadata, currMetadata);
+
             if(currMetadata!=null) {
                 return currMetadata;
             } else {
@@ -60,16 +66,21 @@ public class PartitionedTridentSpoutExecutor implements 
ITridentSpout<Object> {
 
         @Override
         public void close() {
+            LOG.debug("Closing");
             _coordinator.close();
+            LOG.debug("Closed");
         }
 
         @Override
         public void success(long txid) {
+            LOG.debug("Success transaction id " + txid);
         }
 
         @Override
         public boolean isReady(long txid) {
-            return _coordinator.isReady(txid);
+            boolean ready = _coordinator.isReady(txid);
+            LOG.debug("isReady = {} ", ready);
+            return ready;
         }
     }
     
@@ -101,8 +112,9 @@ public class PartitionedTridentSpoutExecutor implements 
ITridentSpout<Object> {
 
         
         @Override
-        public void emitBatch(final TransactionAttempt tx, final Object 
coordinatorMeta,
-                final TridentCollector collector) {
+        public void emitBatch(final TransactionAttempt tx, final Object 
coordinatorMeta, final TridentCollector collector) {
+            LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = 
{}], [collector = {}]", tx, coordinatorMeta, collector);
+
             if(_savedCoordinatorMeta == null || 
!_savedCoordinatorMeta.equals(coordinatorMeta)) {
                 List<ISpoutPartition> partitions = 
_emitter.getOrderedPartitions(coordinatorMeta);
                 _partitionStates.clear();
@@ -133,11 +145,13 @@ public class PartitionedTridentSpoutExecutor implements 
ITridentSpout<Object> {
                 if(meta!=null) {
                     _emitter.emitPartitionBatch(tx, collector, partition, 
meta);
                 }
-            }            
+            }
+            LOG.debug("Emitted Batch. [tx = {}], [coordinatorMeta = {}], 
[collector = {}]", tx, coordinatorMeta, collector);
         }
 
         @Override
         public void success(TransactionAttempt tx) {
+            LOG.debug("Success transaction " + tx);
             for(EmitterPartitionState state: _partitionStates.values()) {
                 state.rotatingState.cleanupBefore(tx.getTransactionId());
             }
@@ -145,8 +159,10 @@ public class PartitionedTridentSpoutExecutor implements 
ITridentSpout<Object> {
 
         @Override
         public void close() {
+            LOG.debug("Closing");
             _state.close();
             _emitter.close();
+            LOG.debug("Closed");
         }
     }    
 

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java 
b/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java
index 82b7360..f43071f 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/state/CombinerValueUpdater.java
@@ -33,4 +33,13 @@ public class CombinerValueUpdater implements 
ValueUpdater<Object> {
         if(stored==null) return arg;
         else return agg.combine(stored, arg);
     }
+
+    // helpful for debugging tests
+    public Object getArg() {
+        return arg;
+    }
+
+    public CombinerAggregator getAgg() {
+        return agg;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
index 6f59409..c10af43 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/MasterBatchCoordinator.java
@@ -22,20 +22,20 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.trident.spout.ITridentSpout;
+import org.apache.storm.trident.topology.state.TransactionalState;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.WindowedTimeThrottler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.trident.spout.ITridentSpout;
-import org.apache.storm.trident.topology.state.TransactionalState;
 
 public class MasterBatchCoordinator extends BaseRichSpout { 
     public static final Logger LOG = 
LoggerFactory.getLogger(MasterBatchCoordinator.class);
@@ -74,6 +74,7 @@ public class MasterBatchCoordinator extends BaseRichSpout {
         }
         _managedSpoutIds = spoutIds;
         _spouts = spouts;
+        LOG.debug("Created {}", this);
     }
 
     public List<String> getManagedSpoutIds(){
@@ -112,6 +113,7 @@ public class MasterBatchCoordinator extends BaseRichSpout {
             String txId = _managedSpoutIds.get(i);
             _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, 
context));
         }
+        LOG.debug("Opened {}", this);
     }
 
     @Override
@@ -119,6 +121,7 @@ public class MasterBatchCoordinator extends BaseRichSpout {
         for(TransactionalState state: _states) {
             state.close();
         }
+        LOG.debug("Closed {}", this);
     }
 
     @Override
@@ -130,9 +133,11 @@ public class MasterBatchCoordinator extends BaseRichSpout {
     public void ack(Object msgId) {
         TransactionAttempt tx = (TransactionAttempt) msgId;
         TransactionStatus status = _activeTx.get(tx.getTransactionId());
+        LOG.debug("Ack. [tx_attempt = {}], [tx_status = {}], [{}]", tx, 
status, this);
         if(status!=null && tx.equals(status.attempt)) {
             if(status.status==AttemptStatus.PROCESSING) {
                 status.status = AttemptStatus.PROCESSED;
+                LOG.debug("Changed status. [tx_attempt = {}] [tx_status = 
{}]", tx, status);
             } else if(status.status==AttemptStatus.COMMITTING) {
                 _activeTx.remove(tx.getTransactionId());
                 _attemptIds.remove(tx.getTransactionId());
@@ -141,6 +146,7 @@ public class MasterBatchCoordinator extends BaseRichSpout {
                 for(TransactionalState state: _states) {
                     state.setData(CURRENT_TX, _currTransaction);               
     
                 }
+                LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], 
[tx_status = {}], [{}]", SUCCESS_STREAM_ID, tx, status, this);
             }
             sync();
         }
@@ -150,6 +156,7 @@ public class MasterBatchCoordinator extends BaseRichSpout {
     public void fail(Object msgId) {
         TransactionAttempt tx = (TransactionAttempt) msgId;
         TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
+        LOG.debug("Fail. [tx_attempt = {}], [tx_status = {}], [{}]", tx, 
stored, this);
         if(stored!=null && tx.equals(stored.attempt)) {
             _activeTx.tailMap(tx.getTransactionId()).clear();
             sync();
@@ -174,6 +181,7 @@ public class MasterBatchCoordinator extends BaseRichSpout {
         if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) 
{
             maybeCommit.status = AttemptStatus.COMMITTING;
             _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), 
maybeCommit.attempt);
+            LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", 
COMMIT_STREAM_ID, maybeCommit, this);
         }
         
         if(_active) {
@@ -196,8 +204,10 @@ public class MasterBatchCoordinator extends BaseRichSpout {
                         }
                         
                         TransactionAttempt attempt = new 
TransactionAttempt(curr, attemptId);
-                        _activeTx.put(curr, new TransactionStatus(attempt));
+                        final TransactionStatus newTransactionStatus = new 
TransactionStatus(attempt);
+                        _activeTx.put(curr, newTransactionStatus);
                         _collector.emit(BATCH_STREAM_ID, new Values(attempt), 
attempt);
+                        LOG.debug("Emitted on [stream = {}], [tx_attempt = 
{}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, 
this);
                         _throttler.markEvent();
                     }
                     curr = nextTransactionId(curr);
@@ -286,4 +296,21 @@ public class MasterBatchCoordinator extends BaseRichSpout {
         ret.tailMap(currTransaction + maxBatches - 1).clear();
         return ret;
     }
+
+    @Override
+    public String toString() {
+        return "MasterBatchCoordinator{" +
+                "_states=" + _states +
+                ", _activeTx=" + _activeTx +
+                ", _attemptIds=" + _attemptIds +
+                ", _collector=" + _collector +
+                ", _currTransaction=" + _currTransaction +
+                ", _maxTransactionActive=" + _maxTransactionActive +
+                ", _coordinators=" + _coordinators +
+                ", _managedSpoutIds=" + _managedSpoutIds +
+                ", _spouts=" + _spouts +
+                ", _throttler=" + _throttler +
+                ", _active=" + _active +
+                "}";
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java
index ebd73b2..6078f51 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java
@@ -19,6 +19,8 @@ package org.apache.storm.trident.topology.state;
 
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.List;
@@ -26,6 +28,8 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 
 public class RotatingTransactionalState {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RotatingTransactionalState.class);
+
     public static interface StateInitializer {
         Object init(long txid, Object lastState);
     }    
@@ -40,6 +44,7 @@ public class RotatingTransactionalState {
         _subdir = subdir;
         state.mkdir(subdir);
         sync();
+        LOG.debug("Created {}", this);
     }
 
 
@@ -49,19 +54,30 @@ public class RotatingTransactionalState {
     }
     
     public void overrideState(long txid, Object state) {
+        LOG.debug("Overriding state. [txid = {}],  [state = {}]", txid, state);
+        LOG.trace("[{}]", this);
+
         _state.setData(txPath(txid), state);
         _curr.put(txid, state);
+
+        LOG.trace("Overriding state complete.  [{}]", this);
     }
 
     public void removeState(long txid) {
+        Object state = null;
         if(_curr.containsKey(txid)) {
-            _curr.remove(txid);
+            state = _curr.remove(txid);
             _state.delete(txPath(txid));
         }
+        LOG.debug("Removed [state = {}], [txid = {}]", state, txid);
+        LOG.trace("[{}]", this);
     }
     
     public Object getState(long txid) {
-        return _curr.get(txid);
+        final Object state = _curr.get(txid);
+        LOG.debug("Getting state. [txid = {}] => [state = {}]", txid, state);
+        LOG.trace("Internal state [{}]", this);
+        return state;
     }
     
     public Object getState(long txid, StateInitializer init) {
@@ -87,13 +103,26 @@ public class RotatingTransactionalState {
             _curr.put(txid, data);
             _state.setData(txPath(txid), data);
         }
-        return _curr.get(txid);
+        Object state = _curr.get(txid);
+        LOG.debug("Getting or initializing state. [txid = {}] => [state = 
{}]", txid, state);
+        LOG.trace("[{}]", this);
+        return state;
     }
     
     public Object getPreviousState(long txid) {
-        SortedMap<Long, Object> prevMap = _curr.headMap(txid);
-        if(prevMap.isEmpty()) return null;
-        else return prevMap.get(prevMap.lastKey());
+        final SortedMap<Long, Object> prevMap = _curr.headMap(txid);
+        Object state;
+
+        if(prevMap.isEmpty()) {
+            state = null;
+        }
+        else {
+            state = prevMap.get(prevMap.lastKey());
+        }
+
+        LOG.debug("Getting previous [state = {}], [txid = {}]", state, txid);
+        LOG.trace("[{}]", this);
+        return state;
     }
     
     public boolean hasCache(long txid) {
@@ -104,12 +133,14 @@ public class RotatingTransactionalState {
      * Returns null if it was created, the value otherwise.
      */
     public Object getStateOrCreate(long txid, StateInitializer init) {
+        Object state;
         if(_curr.containsKey(txid)) {
-            return _curr.get(txid);
+            state = _curr.get(txid);
         } else {
             getState(txid, init);
-            return null;
+            state = null;
         }
+        return state;
     }
     
     public void cleanupBefore(long txid) {
@@ -142,6 +173,14 @@ public class RotatingTransactionalState {
 
     private String txPath(String tx) {
         return _subdir + "/" + tx;
-    }    
-    
+    }
+
+    @Override
+    public String toString() {
+        return "RotatingTransactionalState{" +
+                "_state=" + _state +
+                ", _subdir='" + _subdir + '\'' +
+                ", _curr=" + _curr +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/91903743/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 3bf2794..af1f297 100644
--- 
a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ 
b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -18,27 +18,28 @@
 package org.apache.storm.trident.topology.state;
 
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
 import org.apache.storm.Config;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ZookeeperAuthInfo;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
-import org.apache.curator.framework.api.PathAndBytesable;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.json.simple.JSONValue;
 
 public class TransactionalState {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalState.class);
+
     CuratorFramework _curator;
     List<ACL> _zkAcls = null;
     
@@ -87,6 +88,7 @@ public class TransactionalState {
             byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
         ProtectACLCreateModePathAndBytesable<String> builder =
             curator.create().creatingParentsIfNeeded();
+        LOG.debug("Creating node  [path = {}],  [data = {}],  [acls = {}],  
[mode = {}]", path, asString(data), acls, mode);
     
         if (acls == null) {
             if (mode == null ) {
@@ -100,6 +102,10 @@ public class TransactionalState {
         TransactionalState.forPath(builder.withACL(acls), path, data);
     }
 
+    private static String asString(byte[] data) {
+        return data == null ? "null" : new String(data);
+    }
+
     public void setData(String path, Object obj) {
         path = "/" + path;
         byte[] ser;
@@ -115,6 +121,7 @@ public class TransactionalState {
                 TransactionalState.createNode(_curator, path, ser, _zkAcls,
                         CreateMode.PERSISTENT);
             }
+            LOG.debug("Set [path = {}] => [data = {}]", path, asString(ser));
         } catch(Exception e) {
             throw new RuntimeException(e);
         }        
@@ -127,19 +134,23 @@ public class TransactionalState {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        LOG.debug("Deleted [path = {}]", path);
     }
     
     public List<String> list(String path) {
         path = "/" + path;
         try {
+            List<String> children;
             if(_curator.checkExists().forPath(path)==null) {
-                return new ArrayList<String>();
+                children = new ArrayList<>();
             } else {
-                return _curator.getChildren().forPath(path);
+                children = _curator.getChildren().forPath(path);
             }
+            LOG.debug("List [path = {}], [children = {}]", path, children);
+            return children;
         } catch(Exception e) {
             throw new RuntimeException(e);
-        }   
+        }
     }
     
     public void mkdir(String path) {
@@ -149,11 +160,14 @@ public class TransactionalState {
     public Object getData(String path) {
         path = "/" + path;
         try {
+            Object data;
             if(_curator.checkExists().forPath(path)!=null) {
-                return JSONValue.parse(new 
String(_curator.getData().forPath(path), "UTF-8"));
+                data = JSONValue.parse(new 
String(_curator.getData().forPath(path), "UTF-8"));
             } else {
-                return null;
+                data = null;
             }
+            LOG.debug("Get. [path = {}] => [data = {}]", path, data);
+            return data;
         } catch(Exception e) {
             throw new RuntimeException(e);
         }

Reply via email to