[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690640#comment-16690640
 ] 

Ozgur commented on KAFKA-7628:
------------------------------

I'm using the [version 0.11.0. 
|https://github.com/apache/kafka/blob/0.11.0/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java]

> KafkaStream is not closing
> --------------------------
>
>                 Key: KAFKA-7628
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7628
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.1
>         Environment: Macbook Pro
>            Reporter: Ozgur
>            Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
>             logger.info("KafkaStream already closed?");
>         } else {
>             boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
>             if(closed) {
>                 kafkaStream = null;
>                 logger.info("KafkaStream closed");
>             } else {
>                 logger.info("KafkaStream could not closed");
>             }
>         }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
>             logger.info("KafkaStream is starting");
>             kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
>                     this,
>                     this.getTopic()
>             );
>             kafkaStream.start();
>             logger.info("KafkaStream is started");
>         }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor<String, byte[]> 
> {
>     private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
>     private ProcessorContext context;
>     private ProcessorContext getContext() {
>         return context;
>     }
>     @Override
>     public void init(ProcessorContext context) {
>         this.context = context;
>         this.context.schedule(1000);
>     }
>     @Override
>     public void process(String key, byte[] value) {
>         try {
>             String topic = key.split("-")[0];
>             byte[] uncompressed = GzipCompressionUtil.uncompress(value);
>             String json = new String(uncompressed, "UTF-8");
>             processRecord(topic, json);
>             this.getContext().commit();
>         } catch (Exception e) {
>             logger.error("Error processing json", e);
>         }
>     }
>     protected abstract void processRecord(String topic, String json);
>     @Override
>     public void punctuate(long timestamp) {
>         this.getContext().commit();
>     }
>     @Override
>     public void close() {
>         this.getContext().commit();
>     }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to