[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r416215513 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java ## @@ -265,12 +270,27 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { kafkaStreams.close(); startStreams(); + store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); assertThat(store.approximateNumEntries(), equalTo(4L)); timestampedStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore())); assertThat(timestampedStore.approximateNumEntries(), equalTo(4L)); } +@Test +public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception { Review comment: Copied this over to trunk and verified that it fails consistently This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.warn("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; +threads = new StreamThread[numStreamThreads]; + +final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); +final boolean hasGlobalTopology = globalTaskTopology != null; + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Must subscribe to at least one source topic or global table"); +throw new IllegalArgumentException("Topology has no stream threads and no global threads"); Review comment: ~~Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type~~ edit: Nevermind it's just `TopologyException`, found it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.warn("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; +threads = new StreamThread[numStreamThreads]; + +final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); +final boolean hasGlobalTopology = globalTaskTopology != null; + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Must subscribe to at least one source topic or global table"); +throw new IllegalArgumentException("Topology has no stream threads and no global threads"); Review comment: ~~Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type~~ edit: Nevermind, found it. It's just `TopologyException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414923007 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin @Test public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { -final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); Review comment: But @vvcephei 's last question gets right to the heart of the matter. The answer being " technically yes, but it will crash if you try to poll for said nothing, so really no" That's why the test was flaky, and the reason for this PR in the first place (Avoiding group overhead is the name of the ticket, but the reality is it will only happen once before all the StreamThreads die due to polling no topics) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414922504 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin @Test public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { -final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); Review comment: I realize he wasn't just complaining about the name, but I was trying to keep that discussion in one thread. But I guess you can only do so much to keep PR chatter oriented in one place This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414832679 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin @Test public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { -final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); Review comment: Would a class-level `final StreamsBuilder builder = new StreamsBuilder()` that we add a source to in the setUp be any. better iyo? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414825387 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, true); } +@Test +public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() { +assertThrows( Review comment: Ack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.warn("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; +threads = new StreamThread[numStreamThreads]; + +final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); +final boolean hasGlobalTopology = globalTaskTopology != null; + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Must subscribe to at least one source topic or global table"); +throw new IllegalArgumentException("Topology has no stream threads and no global threads"); Review comment: Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414823010 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin @Test public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { -final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); Review comment: Is `getBuilderWithSource` really that much uglier than `new StreamsBuilder`? :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414822406 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.warn("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; +threads = new StreamThread[numStreamThreads]; + +final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); +final boolean hasGlobalTopology = globalTaskTopology != null; + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Must subscribe to at least one source topic or global table"); +throw new IllegalArgumentException("Topology has no stream threads and no global threads"); Review comment: I guess I can't personally imagine any reason to ever want an app running with an empty topology, and would prefer to be notified immediately since I presumably did something wrong. But if you feel strongly about allowing this I can demote this to a warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414819765 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, true); } +@Test +public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() { +assertThrows( +IllegalArgumentException.class, +() -> new KafkaStreams(new StreamsBuilder().build(), props, supplier, time) +); +} + +@Test +public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() { +final StreamsBuilder builder = new StreamsBuilder(); +builder.globalTable("anyTopic"); +final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time); + +assertThat(streams.threads.length, equalTo(0)); +} + +@Test +public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws InterruptedException { +final StreamsBuilder builder = new StreamsBuilder(); +builder.globalTable("anyTopic"); +final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time); +streams.setStateListener((newState, oldState) -> { +if (newState.equals(State.ERROR)) { +throw new AssertionError("Should not have transitioned to ERROR state with no stream threads"); Review comment: I guess we don't need to throw here, it would just cause KafkaStreams to transition to ERROR and fail below. But I realized this doesn't even do that because we're mocking pretty much everything in this test class including the stream threads. I'll try to look for a better way and place to do this test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414201107 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -161,6 +165,9 @@ public void before() throws Exception { props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); +builder = new StreamsBuilder(); +builder.stream("source"); Review comment: Good call, done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org