Repository: kafka
Updated Branches:
  refs/heads/trunk 3410f02fe -> 674407908


KAFKA-5698: Sort processor nodes based on its sub-tree size

1. Sort processor nodes within a sub-topology by its sub-tree size: nodes with 
largest sizes are source nodes and hence printed earlier.

2. Sort sub-topologies by ids; sort global stores by the source topic names.

3. Open for discussion: start newlines for predecessor and successor.

4. Minor: space between processor nodes and stores / topics; maintain `[]` for 
the topic names.

Author: Guozhang Wang <wangg...@gmail.com>

Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax 
<matth...@confluent.io>, Ted Yu <yuzhih...@gmail.com>

Closes #3618 from guozhangwang/K5698-topology-description-sorting


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/67440790
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/67440790
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/67440790

Branch: refs/heads/trunk
Commit: 674407908b898dd18b447447ad3346b5268b1dfc
Parents: 3410f02
Author: Guozhang Wang <wangg...@gmail.com>
Authored: Thu Sep 7 14:36:49 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Thu Sep 7 14:36:49 2017 -0700

----------------------------------------------------------------------
 .../internals/InternalTopologyBuilder.java      | 112 ++++++++++++++-----
 .../internals/InternalTopologyBuilderTest.java  |  46 ++++++++
 2 files changed, 132 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/67440790/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index da5fe38..437e9e5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -35,10 +35,12 @@ import 
org.apache.kafka.streams.state.internals.WindowStoreSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -47,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.regex.Pattern;
 
 
@@ -59,7 +62,7 @@ public class InternalTopologyBuilder {
     private static final String[] NO_PREDECESSORS = {};
 
     // node factories in a topological order
-    private final LinkedHashMap<String, NodeFactory> nodeFactories = new 
LinkedHashMap<>();
+    private final Map<String, NodeFactory> nodeFactories = new 
LinkedHashMap<>();
 
     // state factories
     private final Map<String, StateStoreFactory> stateFactories = new 
HashMap<>();
@@ -338,7 +341,6 @@ public class InternalTopologyBuilder {
 
             if (pattern == null) {
                 sourceTopics = topics.toString();
-                sourceTopics = sourceTopics.substring(1, sourceTopics.length() 
- 1); // trim first and last, ie. []
             } else {
                 sourceTopics = pattern.toString();
             }
@@ -1263,6 +1265,34 @@ public class InternalTopologyBuilder {
         return false;
     }
 
+    private static class NodeComparator implements 
Comparator<TopologyDescription.Node>, Serializable {
+
+        @Override
+        public int compare(final TopologyDescription.Node node1,
+                           final TopologyDescription.Node node2) {
+            final int size1 = ((AbstractNode) node1).size;
+            final int size2 = ((AbstractNode) node2).size;
+
+            // it is possible that two nodes have the same sub-tree size 
(think two nodes connected via state stores)
+            // in this case default to processor name string
+            if (size1 != size2) {
+                return size2 - size1;
+            } else {
+                return node1.name().compareTo(node2.name());
+            }
+        }
+    }
+
+    private final static NodeComparator NODE_COMPARATOR = new NodeComparator();
+
+    private static void updateSize(final AbstractNode node, final int delta) {
+        node.size += delta;
+
+        for (final TopologyDescription.Node predecessor : node.predecessors()) 
{
+            updateSize((AbstractNode) predecessor, delta);
+        }
+    }
+
     private void describeSubtopology(final TopologyDescription description,
                                      final Integer subtopologyId,
                                      final Set<String> nodeNames) {
@@ -1280,12 +1310,13 @@ public class InternalTopologyBuilder {
                 final AbstractNode predecessor = 
nodesByName.get(predecessorName);
                 node.addPredecessor(predecessor);
                 predecessor.addSuccessor(node);
+                updateSize(predecessor, node.size);
             }
         }
 
         description.addSubtopology(new Subtopology(
-            subtopologyId,
-            new HashSet<TopologyDescription.Node>(nodesByName.values())));
+                subtopologyId,
+                new HashSet<TopologyDescription.Node>(nodesByName.values())));
     }
 
     private void describeGlobalStores(final TopologyDescription description) {
@@ -1339,8 +1370,8 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "GlobalStore: " + source.name + "(topic: " + source.topics 
+ ") -> "
-                + processor.name + "(store: " + 
processor.stores.iterator().next() + ")\n";
+            return "GlobalStore: " + source.name + " (topic: " + source.topics 
+ ")\n      --> "
+                + processor.name + " (store: " + 
processor.stores.iterator().next() + ")\n";
         }
 
         @Override
@@ -1365,11 +1396,15 @@ public class InternalTopologyBuilder {
 
     public abstract static class AbstractNode implements 
TopologyDescription.Node {
         final String name;
-        final Set<TopologyDescription.Node> predecessors = new HashSet<>();
-        final Set<TopologyDescription.Node> successors = new HashSet<>();
+        final Set<TopologyDescription.Node> predecessors = new 
TreeSet<>(NODE_COMPARATOR);
+        final Set<TopologyDescription.Node> successors = new 
TreeSet<>(NODE_COMPARATOR);
+
+        // size of the sub-topology rooted at this node, including the node 
itself
+        int size;
 
         AbstractNode(final String name) {
             this.name = name;
+            this.size = 1;
         }
 
         @Override
@@ -1417,7 +1452,7 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Source: " + name + "(topics: " + topics + ") --> " + 
nodeNames(successors);
+            return "Source: " + name + " (topics: " + topics + ")\n      --> " 
+ nodeNames(successors);
         }
 
         @Override
@@ -1458,7 +1493,7 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Processor: " + name + "(stores: " + stores + ") --> " + 
nodeNames(successors) + " <-- " + nodeNames(predecessors);
+            return "Processor: " + name + " (stores: " + stores + ")\n      
--> " + nodeNames(successors) + "\n      <-- " + nodeNames(predecessors);
         }
 
         @Override
@@ -1505,7 +1540,7 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Sink: " + name + "(topic: " + topic + ") <-- " + 
nodeNames(predecessors);
+            return "Sink: " + name + " (topic: " + topic + ")\n      <-- " + 
nodeNames(predecessors);
         }
 
         @Override
@@ -1532,12 +1567,12 @@ public class InternalTopologyBuilder {
 
     public final static class Subtopology implements 
org.apache.kafka.streams.TopologyDescription.Subtopology {
         private final int id;
-        private final Set<org.apache.kafka.streams.TopologyDescription.Node> 
nodes;
+        private final Set<TopologyDescription.Node> nodes;
 
-        public Subtopology(final int id,
-                    final 
Set<org.apache.kafka.streams.TopologyDescription.Node> nodes) {
+        public Subtopology(final int id, final Set<TopologyDescription.Node> 
nodes) {
             this.id = id;
-            this.nodes = nodes;
+            this.nodes = new TreeSet<>(NODE_COMPARATOR);
+            this.nodes.addAll(nodes);
         }
 
         @Override
@@ -1546,10 +1581,15 @@ public class InternalTopologyBuilder {
         }
 
         @Override
-        public Set<org.apache.kafka.streams.TopologyDescription.Node> nodes() {
+        public Set<TopologyDescription.Node> nodes() {
             return Collections.unmodifiableSet(nodes);
         }
 
+        // only for testing
+        public Iterator<TopologyDescription.Node> nodesInOrder() {
+            return nodes.iterator();
+        }
+
         @Override
         public String toString() {
             return "Sub-topology: " + id + "\n" + nodesAsString();
@@ -1557,7 +1597,7 @@ public class InternalTopologyBuilder {
 
         private String nodesAsString() {
             final StringBuilder sb = new StringBuilder();
-            for (final org.apache.kafka.streams.TopologyDescription.Node node 
: nodes) {
+            for (final TopologyDescription.Node node : nodes) {
                 sb.append("    ");
                 sb.append(node);
                 sb.append('\n');
@@ -1628,31 +1668,51 @@ public class InternalTopologyBuilder {
         }
     }
 
+    private static class GlobalStoreComparator implements 
Comparator<TopologyDescription.GlobalStore>, Serializable {
+        @Override
+        public int compare(final TopologyDescription.GlobalStore globalStore1,
+                           final TopologyDescription.GlobalStore globalStore2) 
{
+            return 
globalStore1.source().name().compareTo(globalStore2.source().name());
+        }
+    }
+
+    private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new 
GlobalStoreComparator();
+
+    private static class SubtopologyComparator implements 
Comparator<TopologyDescription.Subtopology>, Serializable {
+        @Override
+        public int compare(final TopologyDescription.Subtopology subtopology1,
+                           final TopologyDescription.Subtopology subtopology2) 
{
+            return subtopology1.id() - subtopology2.id();
+        }
+    }
+
+    private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new 
SubtopologyComparator();
+
     public final static class TopologyDescription implements 
org.apache.kafka.streams.TopologyDescription {
-        private final 
Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = 
new HashSet<>();
-        private final 
Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = 
new HashSet<>();
+        private final Set<TopologyDescription.Subtopology> subtopologies = new 
TreeSet<>(SUBTOPOLOGY_COMPARATOR);
+        private final Set<TopologyDescription.GlobalStore> globalStores = new 
TreeSet<>(GLOBALSTORE_COMPARATOR);
 
-        public void addSubtopology(final 
org.apache.kafka.streams.TopologyDescription.Subtopology subtopology) {
+        public void addSubtopology(final TopologyDescription.Subtopology 
subtopology) {
             subtopologies.add(subtopology);
         }
 
-        public void addGlobalStore(final 
org.apache.kafka.streams.TopologyDescription.GlobalStore globalStore) {
+        public void addGlobalStore(final TopologyDescription.GlobalStore 
globalStore) {
             globalStores.add(globalStore);
         }
 
         @Override
-        public Set<org.apache.kafka.streams.TopologyDescription.Subtopology> 
subtopologies() {
+        public Set<TopologyDescription.Subtopology> subtopologies() {
             return Collections.unmodifiableSet(subtopologies);
         }
 
         @Override
-        public Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> 
globalStores() {
+        public Set<TopologyDescription.GlobalStore> globalStores() {
             return Collections.unmodifiableSet(globalStores);
         }
 
         @Override
         public String toString() {
-            return subtopologiesAsString() + globalStoresAsString();
+            return subtopologiesAsString() + "\n" + globalStoresAsString();
         }
 
         private String subtopologiesAsString() {
@@ -1661,7 +1721,7 @@ public class InternalTopologyBuilder {
             if (subtopologies.isEmpty()) {
                 sb.append("  none\n");
             } else {
-                for (final 
org.apache.kafka.streams.TopologyDescription.Subtopology st : subtopologies) {
+                for (final TopologyDescription.Subtopology st : subtopologies) 
{
                     sb.append("  ");
                     sb.append(st);
                 }
@@ -1675,7 +1735,7 @@ public class InternalTopologyBuilder {
             if (globalStores.isEmpty()) {
                 sb.append("  none\n");
             } else {
-                for (final 
org.apache.kafka.streams.TopologyDescription.GlobalStore gs : globalStores) {
+                for (final TopologyDescription.GlobalStore gs : globalStores) {
                     sb.append("  ");
                     sb.append(gs);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67440790/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 91edac5..95636ec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -43,6 +44,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -665,6 +667,50 @@ public class InternalTopologyBuilderTest {
     }
 
     @Test
+    public void shouldSortProcessorNodesCorrectly() throws Exception {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+        builder.addSource(null, "source2", null, null, null, "topic2");
+        builder.addProcessor("processor1", new MockProcessorSupplier(), 
"source1");
+        builder.addProcessor("processor2", new MockProcessorSupplier(), 
"source1", "source2");
+        builder.addProcessor("processor3", new MockProcessorSupplier(), 
"processor2");
+        builder.addSink("sink1", "topic2", null, null, null, "processor1", 
"processor3");
+
+        assertEquals(1, builder.describe().subtopologies().size());
+
+        final Iterator<TopologyDescription.Node> iterator = 
((InternalTopologyBuilder.Subtopology) 
builder.describe().subtopologies().iterator().next()).nodesInOrder();
+
+        assertTrue(iterator.hasNext());
+        InternalTopologyBuilder.AbstractNode node = 
(InternalTopologyBuilder.AbstractNode) iterator.next();
+        assertTrue(node.name.equals("source1"));
+        assertEquals(6, node.size);
+
+        assertTrue(iterator.hasNext());
+        node = (InternalTopologyBuilder.AbstractNode) iterator.next();
+        assertTrue(node.name.equals("source2"));
+        assertEquals(4, node.size);
+
+        assertTrue(iterator.hasNext());
+        node = (InternalTopologyBuilder.AbstractNode) iterator.next();
+        assertTrue(node.name.equals("processor2"));
+        assertEquals(3, node.size);
+
+        assertTrue(iterator.hasNext());
+        node = (InternalTopologyBuilder.AbstractNode) iterator.next();
+        assertTrue(node.name.equals("processor1"));
+        assertEquals(2, node.size);
+
+        assertTrue(iterator.hasNext());
+        node = (InternalTopologyBuilder.AbstractNode) iterator.next();
+        assertTrue(node.name.equals("processor3"));
+        assertEquals(2, node.size);
+
+        assertTrue(iterator.hasNext());
+        node = (InternalTopologyBuilder.AbstractNode) iterator.next();
+        assertTrue(node.name.equals("sink1"));
+        assertEquals(1, node.size);
+    }
+
+    @Test
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception 
{
         builder.addSource(null, "ingest", null, null, null, 
Pattern.compile("topic-\\d+"));
         builder.addProcessor("my-processor", new MockProcessorSupplier(), 
"ingest");

Reply via email to