[ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865693#comment-17865693
 ] 

Ao Li commented on KAFKA-17112:
-------------------------------

[~cadonna]Thanks for your reply! I've tried moving `stateUpdater.start();` to 
`StreamThread::start`. This does not work for many tests because the 
`StreamTread::start` is never called. 

{code}
    @Test
    public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps() {
        internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);

        final Properties properties = configProps(false);
        properties.setProperty(
            StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
            LogAndSkipOnInvalidTimestamp.class.getName()
        );
        final StreamsConfig config = new StreamsConfig(properties);
        thread = createStreamThread(CLIENT_ID, config);

        thread.setState(StreamThread.State.STARTING);
        thread.setState(StreamThread.State.PARTITIONS_REVOKED);

        final TaskId task1 = new TaskId(0, t1p1.partition());
        final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
        thread.taskManager().handleAssignment(
            Collections.singletonMap(
                task1,
                assignedPartitions),
            emptyMap());

        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], 
byte[]>) thread.mainConsumer();
        mockConsumer.assign(Collections.singleton(t1p1));
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
        thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
        runOnce();
{code}

If we move `stateUpdater.start();` to `StreamThread::start`, they will hang 
because they may wait for stateUpdater. I also tried to replace all 
`thread.setState(StreamThread.State.STARTING);` with `thread.start();` to see 
if it is an easy fix, but it also breaks many tests. 

Also, the processingThread (TaskExecutor) is created in `StreamThread::create` 
and then passed to `TaskManager`, and StreamThread will lose its access after 
the StreamThread::creat call. This could be fixed easily by implementing a 
start method in TaskManager.

> StreamThread shutdown calls completeShutdown only in CREATED state
> ------------------------------------------------------------------
>
>                 Key: KAFKA-17112
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17112
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 3.9.0
>            Reporter: Ao Li
>            Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to