[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689085#comment-16689085 ]
Ozgur commented on KAFKA-7628: ------------------------------ Hi Guozhang, I've upgraded my client version to the last version (2.0.1) but the problem was same. I'm thinking about this is more likely an application logic error other than Kafka's. Thanks. > KafkaStream is not closing > -------------------------- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1 > Environment: Macbook Pro > Reporter: Ozgur > Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor<String, byte[]> > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)