[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-27 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r416215513



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
##
@@ -265,12 +270,27 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() 
throws Exception {
 kafkaStreams.close();
 
 startStreams();
+
 store = 
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, 
QueryableStoreTypes.keyValueStore()));
 assertThat(store.approximateNumEntries(), equalTo(4L));
 timestampedStore = 
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, 
QueryableStoreTypes.timestampedKeyValueStore()));
 assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
 }
 
+@Test
+public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {

Review comment:
   Copied this over to trunk and verified that it fails consistently





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-27 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   ~~Also, do we have an `InvalidTopologyException` or similar exception 
already? Or were you proposing to add a new type~~ 
   edit: Nevermind it's just `TopologyException`, found it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-27 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   ~~Also, do we have an `InvalidTopologyException` or similar exception 
already? Or were you proposing to add a new type~~ 
   edit: Nevermind, found it. It's just `TopologyException` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414923007



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
   But @vvcephei 's last question gets right to the heart of the matter. 
The answer being " technically yes, but it will crash if you try to poll for 
said nothing, so really no"
   That's why the test was flaky, and the reason for this PR in the first place 
(Avoiding group overhead is the name of the ticket, but the reality is it will 
only happen once before all the StreamThreads die due to polling no topics)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414922504



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
   I realize he wasn't just complaining about the name, but I was trying to 
keep that discussion in one thread. But I guess you can only do so much to keep 
PR chatter oriented in one place 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414832679



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
   Would a class-level `final StreamsBuilder builder = new 
StreamsBuilder()` that we add a source to in the setUp be any. better iyo?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414825387



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() 
throws Exception {
 startStreamsAndCheckDirExists(topology, true);
 }
 
+@Test
+public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+assertThrows(

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   Also, do we have an `InvalidTopologyException` or similar exception 
already? Or were you proposing to add a new type





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414823010



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
   Is `getBuilderWithSource` really that much uglier than `new 
StreamsBuilder`? :P 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822406



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   I guess I can't personally imagine any reason to ever want an app 
running with an empty topology, and would prefer to be notified immediately 
since I presumably did something wrong. But if you feel strongly about allowing 
this I can demote this to a warning





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414819765



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() 
throws Exception {
 startStreamsAndCheckDirExists(topology, true);
 }
 
+@Test
+public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+assertThrows(
+IllegalArgumentException.class,
+() -> new KafkaStreams(new StreamsBuilder().build(), props, 
supplier, time)
+);
+}
+
+@Test
+public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.globalTable("anyTopic");
+final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+
+assertThat(streams.threads.length, equalTo(0));
+}
+
+@Test
+public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws 
InterruptedException {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.globalTable("anyTopic");
+final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+streams.setStateListener((newState, oldState) -> {
+if (newState.equals(State.ERROR)) {
+throw new AssertionError("Should not have transitioned to 
ERROR state with no stream threads");

Review comment:
   I guess we don't need to throw here, it would just cause KafkaStreams to 
transition to ERROR and fail below. But I realized this doesn't even do that 
because we're mocking pretty much everything in this test class including the 
stream threads. I'll try to look for a better way and place to do this test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-23 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414201107



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -161,6 +165,9 @@ public void before() throws Exception {
 props.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+builder = new StreamsBuilder();
+builder.stream("source");

Review comment:
   Good call, done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org