ableegoldman commented on a change in pull request #10683: URL: https://github.com/apache/kafka/pull/10683#discussion_r679572637
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ########## @@ -16,17 +16,240 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.TestUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + public class NamedTopologyIntegrationTest { - //TODO KAFKA-12648 - /** - * Things to test in Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies: - * 1. Verify changelog & repartition topics decorated with named topology - * 2. Make sure app run and works with - * -multiple subtopologies - * -persistent state - * -multi-partition input & output topics - * -standbys - * -piped input and verified output records - * 3. Is the task assignment balanced? Does KIP-441/warmup replica placement work as intended? - */ + + private static final int NUM_BROKERS = 1; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + @Rule + public final TestName testName = new TestName(); + private String appId; + + private String inputStream1; + private String inputStream2; + private String inputStream3; + private String outputStream1; + private String outputStream2; + private String outputStream3; + private String storeChangelog1; + private String storeChangelog2; + private String storeChangelog3; + + final List<KeyValue<String, Long>> standardInputData = asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L)); + final List<KeyValue<String, Long>> standardOutputData = asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 1L)); // output of basic count topology with caching + + final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class); + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class); + + final NamedTopologyStreamsBuilder builder1 = new NamedTopologyStreamsBuilder("topology-1"); + final NamedTopologyStreamsBuilder builder2 = new NamedTopologyStreamsBuilder("topology-2"); + final NamedTopologyStreamsBuilder builder3 = new NamedTopologyStreamsBuilder("topology-3"); + + Properties props; + KafkaStreamsNamedTopologyWrapper streams; + + private Properties configProps() { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + @Before Review comment: Good point, most of them can share the same topics/input data and can save this repeated setup/teardown. A few do need their own topics so they can control when input data appears on which topic, but we can just handle those few separately -- I'll move most of this to a @Before/AfterClass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org