[ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865569#comment-17865569
 ] 

Ao Li commented on KAFKA-17112:
-------------------------------

[~cadonna] Aren't stateUpdater and processingThread expected to be started in 
some tests since the parameterized tests control them.

{code}
    @Parameter(0)
    public boolean stateUpdaterEnabled = true;

    @Parameter(1)
    public boolean processingThreadsEnabled = true;

    @Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[][] {
            {false, false}, {true, false}, {true, true}
        });
    }
{code}

and here 

{code}
    private static StateUpdater maybeCreateAndStartStateUpdater(final boolean 
stateUpdaterEnabled,
                                                                final 
StreamsMetricsImpl streamsMetrics,
                                                                final 
StreamsConfig streamsConfig,
                                                                final 
Consumer<byte[], byte[]> restoreConsumer,
                                                                final 
ChangelogReader changelogReader,
                                                                final 
TopologyMetadata topologyMetadata,
                                                                final Time time,
                                                                final String 
clientId,
                                                                final int 
threadIdx) {
        if (stateUpdaterEnabled) {
            final String name = clientId + "-StateUpdater-" + threadIdx;
            final StateUpdater stateUpdater = new DefaultStateUpdater(
                name,
                streamsMetrics.metricsRegistry(),
                streamsConfig,
                restoreConsumer,
                changelogReader,
                topologyMetadata,
                time
            );
            stateUpdater.start();
            return stateUpdater;
        } else {
            return null;
        }
    }
{code}

> StreamThread shutdown calls completeShutdown only in CREATED state
> ------------------------------------------------------------------
>
>                 Key: KAFKA-17112
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17112
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 3.9.0
>            Reporter: Ao Li
>            Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to