[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679589984 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,304 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the "__" (double underscore) string is not allowed for topology names, so it's safe to use to indicate +// that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability Review comment: Yeah I think it would fine (better even) to swap in the topologies rather than the topology builders, the only reason for using the builders now is that a huge amount of topology-related functionality currently resides in the InternalTopologyBuilder, including pretty much all the metadata. I 100% would [support cleaning this up](https://github.com/apache/kafka/pull/10683#discussion_r679581108) and separating things out from this class and made sure it would be easy to do so here, the builders are really only kept around after the topology is built because they still contain most of the metadata we need. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679589984 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,304 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the "__" (double underscore) string is not allowed for topology names, so it's safe to use to indicate +// that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability Review comment: Yeah I think it would fine (better even) to swap in the topologies rather than the topology builders, the only reason for using the builders now is that a huge amount of topology-related functionality currently resides in the InternalTopologyBuilder, including pretty much all the metadata. I 100% would support cleaning this up and separating things out from this class and made sure it would be easy to do so here, the builders are really only kept around after the topology is built because they still contain most of the metadata we need -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679581108 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ## @@ -16,17 +16,240 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + public class NamedTopologyIntegrationTest { -//TODO KAFKA-12648 -/** - * Things to test in Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies: - * 1. Verify changelog & repartition topics decorated with named topology - * 2. Make sure app run and works with - * -multiple subtopologies - * -persistent state - * -multi-partition input & output topics - * -standbys - * -piped input and verified output records - * 3. Is the task assignment balanced? Does KIP-441/warmup replica placement work as intended? - */ + +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Rule +public final TestName testName = new TestName(); +private String appId; + +private String inputStream1; +private String inputStream2; +private String inputStream3; +private String outputStream1; +private String outputStream2; +private String outputStream3; +private String storeChangelog1; +private String storeChangelog2; +private String storeChangelog3; + +final List> standardInputData = asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L)); +final List> standardOutputData = asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); // output of basic count topology with caching + +final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); +final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class); +final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); + +final NamedTopologyStreamsBuilder builder1 = new NamedTopologyStreamsBuilder("topology-1"); +final NamedTopologyStreamsBuilder builder2 = new NamedTopologyStreamsBuilder("topology-2"); +final NamedTopologyStreamsBuilder builder3 = new NamedTopologyStreamsBuilder("topology-3"); + +Properties props; +KafkaStreamsNamedTopologyWrapper streams; + +private Properties configProps() { +final Properties streamsConfiguration = new Properties(); +streamsConfiguration.put(Stre
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679572946 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ## @@ -16,17 +16,240 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + public class NamedTopologyIntegrationTest { -//TODO KAFKA-12648 -/** - * Things to test in Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies: - * 1. Verify changelog & repartition topics decorated with named topology - * 2. Make sure app run and works with - * -multiple subtopologies - * -persistent state - * -multi-partition input & output topics - * -standbys - * -piped input and verified output records - * 3. Is the task assignment balanced? Does KIP-441/warmup replica placement work as intended? - */ + +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Rule +public final TestName testName = new TestName(); +private String appId; + +private String inputStream1; +private String inputStream2; +private String inputStream3; +private String outputStream1; +private String outputStream2; +private String outputStream3; +private String storeChangelog1; +private String storeChangelog2; +private String storeChangelog3; + +final List> standardInputData = asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L)); +final List> standardOutputData = asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); // output of basic count topology with caching + +final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); +final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class); +final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); + +final NamedTopologyStreamsBuilder builder1 = new NamedTopologyStreamsBuilder("topology-1"); +final NamedTopologyStreamsBuilder builder2 = new NamedTopologyStreamsBuilder("topology-2"); +final NamedTopologyStreamsBuilder builder3 = new NamedTopologyStreamsBuilder("topology-3"); + +Properties props; +KafkaStreamsNamedTopologyWrapper streams; + +private Properties configProps() { +final Properties streamsConfiguration = new Properties(); +streamsConfiguration.put(Stre
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679572637 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ## @@ -16,17 +16,240 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + public class NamedTopologyIntegrationTest { -//TODO KAFKA-12648 -/** - * Things to test in Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies: - * 1. Verify changelog & repartition topics decorated with named topology - * 2. Make sure app run and works with - * -multiple subtopologies - * -persistent state - * -multi-partition input & output topics - * -standbys - * -piped input and verified output records - * 3. Is the task assignment balanced? Does KIP-441/warmup replica placement work as intended? - */ + +private static final int NUM_BROKERS = 1; + +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Rule +public final TestName testName = new TestName(); +private String appId; + +private String inputStream1; +private String inputStream2; +private String inputStream3; +private String outputStream1; +private String outputStream2; +private String outputStream3; +private String storeChangelog1; +private String storeChangelog2; +private String storeChangelog3; + +final List> standardInputData = asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L)); +final List> standardOutputData = asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); // output of basic count topology with caching + +final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); +final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class); +final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); + +final NamedTopologyStreamsBuilder builder1 = new NamedTopologyStreamsBuilder("topology-1"); +final NamedTopologyStreamsBuilder builder2 = new NamedTopologyStreamsBuilder("topology-2"); +final NamedTopologyStreamsBuilder builder3 = new NamedTopologyStreamsBuilder("topology-3"); + +Properties props; +KafkaStreamsNamedTopologyWrapper streams; + +private Properties configProps() { +final Properties streamsConfiguration = new Properties(); +streamsConfiguration.put(Stre
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679571374 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,304 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the "__" (double underscore) string is not allowed for topology names, so it's safe to use to indicate +// that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.topologyName(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are named topologies but some are empty, this indicates a bug in user code +if (hasNamedTopologies()) { +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { Review comment: Ahh yeah I was even confusing myself with this after a while. "Local topology" sounds much better -- will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669252416 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -1216,39 +1245,26 @@ private void setRegexMatchedTopicToStateStore() { } } -public synchronized Pattern earliestResetTopicsPattern() { -return resetTopicsPattern(earliestResetTopics, earliestResetPatterns); +public boolean hasOffsetResetOverrides() { +return !(earliestResetTopics.isEmpty() && earliestResetPatterns.isEmpty() +&& latestResetTopics.isEmpty() && latestResetPatterns.isEmpty()); } -public synchronized Pattern latestResetTopicsPattern() { -return resetTopicsPattern(latestResetTopics, latestResetPatterns); -} - -private Pattern resetTopicsPattern(final Set resetTopics, - final Set resetPatterns) { -final List topics = maybeDecorateInternalSourceTopics(resetTopics); - -return buildPattern(topics, resetPatterns); -} - -private static Pattern buildPattern(final Collection sourceTopics, -final Collection sourcePatterns) { -final StringBuilder builder = new StringBuilder(); - -for (final String topic : sourceTopics) { -builder.append(topic).append("|"); -} - -for (final Pattern sourcePattern : sourcePatterns) { -builder.append(sourcePattern.pattern()).append("|"); -} - -if (builder.length() > 0) { -builder.setLength(builder.length() - 1); -return Pattern.compile(builder.toString()); +public OffsetResetStrategy offsetResetStrategy(final String topic) { +if (maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) || +earliestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { +return EARLIEST; +} else if (maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) || +latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { +return LATEST; +} else if (maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic) Review comment: The `NONE` case means we do have this topic in this InternalTopologyBuilder (as opposed to that of a different NamedTopology) but it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the first two `if` conditions above, then all that's left is to verify whether or not we have this topic at all -- which is going to be true if we find it in either the source topic set or pattern. Maybe you were wondering about the `|| !hasNamedTopology()` part? Basically if we don't have any NamedTopologies then there is only one InternalTopologyBuilder, so all topics should belong to it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669250045 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -1065,14 +1086,22 @@ private void buildProcessorNode(final Map> pro return Collections.unmodifiableMap(globalStateStores); } -public Set allStateStoreName() { +public Set allStateStoreNames() { Objects.requireNonNull(applicationId, "topology has not completed optimization"); final Set allNames = new HashSet<>(stateFactories.keySet()); allNames.addAll(globalStateStores.keySet()); return Collections.unmodifiableSet(allNames); } +public boolean hasStore(final String name) { +return stateFactories.containsKey(name) || globalStateStores.containsKey(name); +} + +public boolean hasPersistentStores() { Review comment: Previously we would get a handle on the actual topology and then it would have to iterate through all the stores to check each one for persistence. But while you can now add and remove individual named topologies, you still can't change a topology or the stores in it while the app is running, so we may as well just keep track of whether we found any persistent stores or not as we go along, rather than iterate over all of them later. Also, this way we can keep and access this metadata easily through the TopologyMetadata/InternalTopologyBuilder, rather than ever having to go access the ProcessorTopology directly at all That said, I'm not _too_ attached to this way of doing things, so if you have concerns I can go back to something like how it was before. Just lmk what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669233192 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -345,8 +343,17 @@ private SinkNodeFactory(final String name, } } +public void setTopologyName(final String namedTopology) { Review comment: I tried to, but just couldn't make it work. It has to do with Java and subclassing quirks like constructing the parent before the child. It seems to be pretty much impossible to set things up so that everything is `final` -- if we set the `topologyName` in the NamedTopology constructor, then it's not accessible (ie always null) when we call the `InternalTopologyBuilder`'s constructor since that occurs during the parent `Topology`'s construction. It's definitely annoying, but at least we should be able to clean things up once we go through a KIP and don't need to subclass like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669211161 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are topologies but they are all empty, this indicates a bug in user code +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { Review comment: @guozhangwang WDYT? If the user has started up Streams with several named topologies, but a subset of them are completely empty, should this be considered user error and cause Streams to shutdown or should we just roll with it as long as at least one topology is non-empty? Take a look at the current state and lmk what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669211161 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are topologies but they are all empty, this indicates a bug in user code +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { Review comment: @guozhangwang WDYT? If the user has started up Streams with several named topologies, but a subset of them are completely empty, should this be considered user error and cause Streams to shutdown or should we just roll with it as long as at least one topology is non-empty? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669210507 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are topologies but they are all empty, this indicates a bug in user code +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { Review comment: Sure, but if `builders.isEmpty` then we would enter the `if` block above and return before reaching this section of the code. But I think maybe you meant that in `hasNoNonGlobalTopology`, we should actually return true only if _all_ builders have no non-global topology, not if that's true for _any one_ of them? There's some argument to be made for how to handle the case where some named topologies are legit, while others are empty, but I would still advocate for throwing an exception when _any_ topology is empty since this is not a valid configuration. In which case, the current code is correct, but the comment is not. I'll fix the misleading comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669197330 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { Review comment: Also, if a user has a global-only topology, then it's absolutely valid for them to eplicitly configure the app to have no StreamThreads. In fact if we detect that case then we actually override the configured num.stream.threads to 0 for them, anyways -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669196718 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are topologies but they are all empty, this indicates a bug in user code +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { +log.error("Topology with no input topics will create no stream threads and no global thread."); +throw new TopologyException("Topology has no stream threads and no global threads, " + +"must subscribe to at least one source topic or global table."); +} + +// Lastly we check for an empty non-global topology and override the threads to zero if set otherwise +if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) { +log.info("Overriding number of StreamThreads to zero for global-only topology"); +return 0; +} + +return configuredNumStreamThreads; +} + +public boolean hasNamedTopologies() { +// This includes the case of starting up with no named topologies at all +return !builders.containsKey(UNNAMED_TOPOLOGY); +} + +public boolean hasGlobalTopology() { +return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores); +} + +public boolean hasNoNonGlobalTopology() { +return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology); +} + +public boolean hasPersistentStores() { +// If the app is using named topologies, there may not be any persistent state when it first starts up +// but a new NamedTopology may introduce it later, so we must return true +if (hasNamedTopologies()) { +return true; +} +return evaluateConditio
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669196246 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -16,10 +16,294 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO KAFKA-12648: +// 1) synchronize on these methods instead of individual InternalTopologyBuilder methods, where applicable public class TopologyMetadata { -//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) +private final Logger log = LoggerFactory.getLogger(TopologyMetadata.class); + +// the '_' character is not allowed for topology names, thus it's safe to use to indicate that it's not a named topology +private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__"; +private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + +private final StreamsConfig config; +private final SortedMap builders; // Keep sorted by topology name for readability + +private ProcessorTopology globalTopology; +private Map globalStateStores = new HashMap<>(); +final Set allInputTopics = new HashSet<>(); + +public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +this.config = config; +builders = new TreeMap<>(); +if (builder.hasNamedTopology()) { +builders.put(builder.namedTopology(), builder); +} else { +builders.put(UNNAMED_TOPOLOGY, builder); +} +} + +public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +this.config = config; +this.builders = builders; +if (builders.isEmpty()) { +log.debug("Building KafkaStreams app with no empty topology"); +} +} + +public int getNumStreamThreads(final StreamsConfig config) { +final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); + +// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later +if (builders.isEmpty()) { +if (configuredNumStreamThreads != 0) { +log.info("Overriding number of StreamThreads to zero for empty topology"); +} +return 0; +} + +// If there are topologies but they are all empty, this indicates a bug in user code +if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { +log.error("Topology with no input topics will create no stream threads and no global thread."); +throw new TopologyException("Topology has no stream threads and no global threads, " + +"must subscribe to at least one source topic or global table."); +} + +// Lastly we check for an empty non-global topology and override the threads to zero if set otherwise +if (configuredNumStreamThreads != 0 && hasNoNonGlobalTopology()) { +log.info("Overriding number of StreamThreads to zero for global-only topology"); +return 0; +} + +return configuredNumStreamThreads; +} + +public boolean hasNamedTopologies() { +// This includes the case of starting up with no named topologies at all +return !builders.containsKey(UNNAMED_TOPOLOGY); +} + +public boolean hasGlobalTopology() { +return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores); +} + +public boolean hasNoNonGlobalTopology() { +return evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoNonGlobalTopology); +} + +public boolean hasPersistentStores() { +// If the app is using named topologies, there may not be any persistent state when it first starts up +// but a new NamedTopology may introduce it later, so we must return true +if (hasNamedTopologies()) { +return true; +} +return evaluateConditio
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669193117 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -364,6 +371,10 @@ public synchronized final StreamsConfig getStreamsConfig() { return config; } +public String namedTopology() { Review comment: You mean to rename this to `topologyName()`? Ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669192999 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -345,8 +343,17 @@ private SinkNodeFactory(final String name, } } +public void setTopologyName(final String namedTopology) { +Objects.requireNonNull(namedTopology, "named topology can't be null"); +if (this.namedTopology != null) { +log.error("Tried to reset the namedTopology to {} but it was already set to {}", namedTopology, this.namedTopology); +throw new IllegalStateException("NamedTopology has already been set to " + this.namedTopology); +} +this.namedTopology = namedTopology; +} + // public for testing only -public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) { Review comment: I looked around and can't imagine why we would ever need it. `setApplicationId` should only ever be called once, from a single location/thread -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669191369 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int numStreamThreads) { if (numStreamThreads == 0) { return totalCacheSize; } -return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0)); Review comment: We're just trying to get the basic NamedTopology feature up and running for the time being, so we'll loop back around on any "missing" features eventually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669190141 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1200,7 +1179,7 @@ private long getCacheSizePerThread(final int numStreamThreads) { if (numStreamThreads == 0) { return totalCacheSize; } -return totalCacheSize / (numStreamThreads + ((globalTaskTopology != null) ? 1 : 0)); +return totalCacheSize / (numStreamThreads + (topologyMetadata.hasGlobalTopology() ? 1 : 0)); Review comment: At the moment we just don't allow global stores with named topologies. There is a list of not-yet-supported features that are currently incompatible with them in the javadocs of KafkaStreamsNamedTopologyWrapper -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10683: KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies
ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r669188996 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -799,41 +795,41 @@ public KafkaStreams(final Topology topology, public KafkaStreams(final Topology topology, final StreamsConfig config, final Time time) { -this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier(), time); +this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, new DefaultKafkaClientSupplier(), time); } -private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, - final StreamsConfig config, - final KafkaClientSupplier clientSupplier) throws StreamsException { -this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM); -} - -private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, +private KafkaStreams(final Topology topology, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final Time time) throws StreamsException { +this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier, time); +} + +protected KafkaStreams(final TopologyMetadata topologyMetadata, Review comment: Just this one does, as it's called from the child class KafkaStreamsNamedTopologyWrapper. I'll change the one below back to private though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org