Repository: kafka Updated Branches: refs/heads/trunk 3410f02fe -> 674407908
KAFKA-5698: Sort processor nodes based on its sub-tree size 1. Sort processor nodes within a sub-topology by its sub-tree size: nodes with largest sizes are source nodes and hence printed earlier. 2. Sort sub-topologies by ids; sort global stores by the source topic names. 3. Open for discussion: start newlines for predecessor and successor. 4. Minor: space between processor nodes and stores / topics; maintain `[]` for the topic names. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Ted Yu <yuzhih...@gmail.com> Closes #3618 from guozhangwang/K5698-topology-description-sorting Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/67440790 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/67440790 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/67440790 Branch: refs/heads/trunk Commit: 674407908b898dd18b447447ad3346b5268b1dfc Parents: 3410f02 Author: Guozhang Wang <wangg...@gmail.com> Authored: Thu Sep 7 14:36:49 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Sep 7 14:36:49 2017 -0700 ---------------------------------------------------------------------- .../internals/InternalTopologyBuilder.java | 112 ++++++++++++++----- .../internals/InternalTopologyBuilderTest.java | 46 ++++++++ 2 files changed, 132 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/67440790/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index da5fe38..437e9e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -35,10 +35,12 @@ import org.apache.kafka.streams.state.internals.WindowStoreSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -47,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeSet; import java.util.regex.Pattern; @@ -59,7 +62,7 @@ public class InternalTopologyBuilder { private static final String[] NO_PREDECESSORS = {}; // node factories in a topological order - private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>(); + private final Map<String, NodeFactory> nodeFactories = new LinkedHashMap<>(); // state factories private final Map<String, StateStoreFactory> stateFactories = new HashMap<>(); @@ -338,7 +341,6 @@ public class InternalTopologyBuilder { if (pattern == null) { sourceTopics = topics.toString(); - sourceTopics = sourceTopics.substring(1, sourceTopics.length() - 1); // trim first and last, ie. [] } else { sourceTopics = pattern.toString(); } @@ -1263,6 +1265,34 @@ public class InternalTopologyBuilder { return false; } + private static class NodeComparator implements Comparator<TopologyDescription.Node>, Serializable { + + @Override + public int compare(final TopologyDescription.Node node1, + final TopologyDescription.Node node2) { + final int size1 = ((AbstractNode) node1).size; + final int size2 = ((AbstractNode) node2).size; + + // it is possible that two nodes have the same sub-tree size (think two nodes connected via state stores) + // in this case default to processor name string + if (size1 != size2) { + return size2 - size1; + } else { + return node1.name().compareTo(node2.name()); + } + } + } + + private final static NodeComparator NODE_COMPARATOR = new NodeComparator(); + + private static void updateSize(final AbstractNode node, final int delta) { + node.size += delta; + + for (final TopologyDescription.Node predecessor : node.predecessors()) { + updateSize((AbstractNode) predecessor, delta); + } + } + private void describeSubtopology(final TopologyDescription description, final Integer subtopologyId, final Set<String> nodeNames) { @@ -1280,12 +1310,13 @@ public class InternalTopologyBuilder { final AbstractNode predecessor = nodesByName.get(predecessorName); node.addPredecessor(predecessor); predecessor.addSuccessor(node); + updateSize(predecessor, node.size); } } description.addSubtopology(new Subtopology( - subtopologyId, - new HashSet<TopologyDescription.Node>(nodesByName.values()))); + subtopologyId, + new HashSet<TopologyDescription.Node>(nodesByName.values()))); } private void describeGlobalStores(final TopologyDescription description) { @@ -1339,8 +1370,8 @@ public class InternalTopologyBuilder { @Override public String toString() { - return "GlobalStore: " + source.name + "(topic: " + source.topics + ") -> " - + processor.name + "(store: " + processor.stores.iterator().next() + ")\n"; + return "GlobalStore: " + source.name + " (topic: " + source.topics + ")\n --> " + + processor.name + " (store: " + processor.stores.iterator().next() + ")\n"; } @Override @@ -1365,11 +1396,15 @@ public class InternalTopologyBuilder { public abstract static class AbstractNode implements TopologyDescription.Node { final String name; - final Set<TopologyDescription.Node> predecessors = new HashSet<>(); - final Set<TopologyDescription.Node> successors = new HashSet<>(); + final Set<TopologyDescription.Node> predecessors = new TreeSet<>(NODE_COMPARATOR); + final Set<TopologyDescription.Node> successors = new TreeSet<>(NODE_COMPARATOR); + + // size of the sub-topology rooted at this node, including the node itself + int size; AbstractNode(final String name) { this.name = name; + this.size = 1; } @Override @@ -1417,7 +1452,7 @@ public class InternalTopologyBuilder { @Override public String toString() { - return "Source: " + name + "(topics: " + topics + ") --> " + nodeNames(successors); + return "Source: " + name + " (topics: " + topics + ")\n --> " + nodeNames(successors); } @Override @@ -1458,7 +1493,7 @@ public class InternalTopologyBuilder { @Override public String toString() { - return "Processor: " + name + "(stores: " + stores + ") --> " + nodeNames(successors) + " <-- " + nodeNames(predecessors); + return "Processor: " + name + " (stores: " + stores + ")\n --> " + nodeNames(successors) + "\n <-- " + nodeNames(predecessors); } @Override @@ -1505,7 +1540,7 @@ public class InternalTopologyBuilder { @Override public String toString() { - return "Sink: " + name + "(topic: " + topic + ") <-- " + nodeNames(predecessors); + return "Sink: " + name + " (topic: " + topic + ")\n <-- " + nodeNames(predecessors); } @Override @@ -1532,12 +1567,12 @@ public class InternalTopologyBuilder { public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology { private final int id; - private final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes; + private final Set<TopologyDescription.Node> nodes; - public Subtopology(final int id, - final Set<org.apache.kafka.streams.TopologyDescription.Node> nodes) { + public Subtopology(final int id, final Set<TopologyDescription.Node> nodes) { this.id = id; - this.nodes = nodes; + this.nodes = new TreeSet<>(NODE_COMPARATOR); + this.nodes.addAll(nodes); } @Override @@ -1546,10 +1581,15 @@ public class InternalTopologyBuilder { } @Override - public Set<org.apache.kafka.streams.TopologyDescription.Node> nodes() { + public Set<TopologyDescription.Node> nodes() { return Collections.unmodifiableSet(nodes); } + // only for testing + public Iterator<TopologyDescription.Node> nodesInOrder() { + return nodes.iterator(); + } + @Override public String toString() { return "Sub-topology: " + id + "\n" + nodesAsString(); @@ -1557,7 +1597,7 @@ public class InternalTopologyBuilder { private String nodesAsString() { final StringBuilder sb = new StringBuilder(); - for (final org.apache.kafka.streams.TopologyDescription.Node node : nodes) { + for (final TopologyDescription.Node node : nodes) { sb.append(" "); sb.append(node); sb.append('\n'); @@ -1628,31 +1668,51 @@ public class InternalTopologyBuilder { } } + private static class GlobalStoreComparator implements Comparator<TopologyDescription.GlobalStore>, Serializable { + @Override + public int compare(final TopologyDescription.GlobalStore globalStore1, + final TopologyDescription.GlobalStore globalStore2) { + return globalStore1.source().name().compareTo(globalStore2.source().name()); + } + } + + private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator(); + + private static class SubtopologyComparator implements Comparator<TopologyDescription.Subtopology>, Serializable { + @Override + public int compare(final TopologyDescription.Subtopology subtopology1, + final TopologyDescription.Subtopology subtopology2) { + return subtopology1.id() - subtopology2.id(); + } + } + + private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); + public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies = new HashSet<>(); - private final Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores = new HashSet<>(); + private final Set<TopologyDescription.Subtopology> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final Set<TopologyDescription.GlobalStore> globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); - public void addSubtopology(final org.apache.kafka.streams.TopologyDescription.Subtopology subtopology) { + public void addSubtopology(final TopologyDescription.Subtopology subtopology) { subtopologies.add(subtopology); } - public void addGlobalStore(final org.apache.kafka.streams.TopologyDescription.GlobalStore globalStore) { + public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) { globalStores.add(globalStore); } @Override - public Set<org.apache.kafka.streams.TopologyDescription.Subtopology> subtopologies() { + public Set<TopologyDescription.Subtopology> subtopologies() { return Collections.unmodifiableSet(subtopologies); } @Override - public Set<org.apache.kafka.streams.TopologyDescription.GlobalStore> globalStores() { + public Set<TopologyDescription.GlobalStore> globalStores() { return Collections.unmodifiableSet(globalStores); } @Override public String toString() { - return subtopologiesAsString() + globalStoresAsString(); + return subtopologiesAsString() + "\n" + globalStoresAsString(); } private String subtopologiesAsString() { @@ -1661,7 +1721,7 @@ public class InternalTopologyBuilder { if (subtopologies.isEmpty()) { sb.append(" none\n"); } else { - for (final org.apache.kafka.streams.TopologyDescription.Subtopology st : subtopologies) { + for (final TopologyDescription.Subtopology st : subtopologies) { sb.append(" "); sb.append(st); } @@ -1675,7 +1735,7 @@ public class InternalTopologyBuilder { if (globalStores.isEmpty()) { sb.append(" none\n"); } else { - for (final org.apache.kafka.streams.TopologyDescription.GlobalStore gs : globalStores) { + for (final TopologyDescription.GlobalStore gs : globalStores) { sb.append(" "); sb.append(gs); } http://git-wip-us.apache.org/repos/asf/kafka/blob/67440790/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 91edac5..95636ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.errors.TopologyException; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -665,6 +667,50 @@ public class InternalTopologyBuilderTest { } @Test + public void shouldSortProcessorNodesCorrectly() throws Exception { + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addProcessor("processor2", new MockProcessorSupplier(), "source1", "source2"); + builder.addProcessor("processor3", new MockProcessorSupplier(), "processor2"); + builder.addSink("sink1", "topic2", null, null, null, "processor1", "processor3"); + + assertEquals(1, builder.describe().subtopologies().size()); + + final Iterator<TopologyDescription.Node> iterator = ((InternalTopologyBuilder.Subtopology) builder.describe().subtopologies().iterator().next()).nodesInOrder(); + + assertTrue(iterator.hasNext()); + InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next(); + assertTrue(node.name.equals("source1")); + assertEquals(6, node.size); + + assertTrue(iterator.hasNext()); + node = (InternalTopologyBuilder.AbstractNode) iterator.next(); + assertTrue(node.name.equals("source2")); + assertEquals(4, node.size); + + assertTrue(iterator.hasNext()); + node = (InternalTopologyBuilder.AbstractNode) iterator.next(); + assertTrue(node.name.equals("processor2")); + assertEquals(3, node.size); + + assertTrue(iterator.hasNext()); + node = (InternalTopologyBuilder.AbstractNode) iterator.next(); + assertTrue(node.name.equals("processor1")); + assertEquals(2, node.size); + + assertTrue(iterator.hasNext()); + node = (InternalTopologyBuilder.AbstractNode) iterator.next(); + assertTrue(node.name.equals("processor3")); + assertEquals(2, node.size); + + assertTrue(iterator.hasNext()); + node = (InternalTopologyBuilder.AbstractNode) iterator.next(); + assertTrue(node.name.equals("sink1")); + assertEquals(1, node.size); + } + + @Test public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest");