Hi,从kafka获取数据的while循环中,为啥不直接使用当前线程(LegacySourceFunctionThread),而是新创建了consumerThread每次取一条数据然后在交给当前线程

源码版本:1.10.0
// KafkaFetcher.java
@Override
public void runFetchLoop() throws Exception {
       try {
              final Handover handover = this.handover;

              // kick off the actual Kafka consumer
              consumerThread.start();

              while (running) {
                     // this blocks until we get the next records
                     // it automatically re-throws exceptions encountered in 
the consumer thread
                     final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();

                     // get the records for each topic partition
                     for (KafkaTopicPartitionState<TopicPartition> partition : 
subscribedPartitionStates()) {

                     }
              }
       }
       finally {
              // this signals the consumer thread that no more work is to be 
done
              consumerThread.shutdown();
       }

       // on a clean exit, wait for the runner thread
       try {
              consumerThread.join();
       }

谢谢回复

回复