[ https://issues.apache.org/jira/browse/KAFKA-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sunqing updated KAFKA-8497: --------------------------- Description: A simple kafka streams application, use KStream to consume data, as below。 Memory usage is very high when there is a large amount of data under the consuming topic. Sometimes it goes up to 20G. This is very strange. The program doesn't do anything. It just reads the data and prints it to the screen. Why is the memory usage so high when there is a lot of data in the topic? The program code: public class TestMain { public static StreamsBuilder builder = new StreamsBuilder(); public static void kafkaStreamStart() { KStream<String, String> stream = builder.stream(Arrays.asList("wk_wangxin_po")); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); props.setProperty("sasl.kerberos.service.name", "kafka"); System.setProperty("java.security.auth.login.config", "./conf/kafka_client_jaas.conf"); stream.foreach(new ForeachAction<String, String>() { @Override public void apply(String key, String value) { System.out.println("===="); System.out.println(key); } }); Topology topo = builder.build(); KafkaStreams streams = new KafkaStreams(topo, props); streams.start(); } public static void main(String[] args) { kafkaStreamStart(); } } was: 一个简单的kafka streams测试应用,使用KStream来消费数据,当所消费的kafka Topic中的数据暴涨时,或者要消费的Topic中待消费数据量很大时,消费程序占用的内存会非常高,能达到20多G, 疑问:kafka streams不是逐条消费吗,为啥topic中的数据量很大时会导致程序内存飙升 测试程序代码如下: 代码如下: public class RunMain { public static StreamsBuilder builder = new StreamsBuilder(); public static void kafkaStreamStart() { KStream<String, String> stream = builder.stream(Arrays.asList("wk_wangxin_po")); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); props.setProperty("sasl.kerberos.service.name", "kafka"); System.setProperty("java.security.auth.login.config", "./conf/kafka_client_jaas.conf"); stream.foreach(new ForeachAction<String, String>() { @Override public void apply(String key, String value) { System.out.println("===="); System.out.println(key); } }); Topology topo = builder.build(); KafkaStreams streams = new KafkaStreams(topo, props); streams.start(); } public static void main(String[] args) { kafkaStreamStart(); } } Summary: kafka streams application takes up a lot of memory (was: kafka streams application占用内存很高) > kafka streams application takes up a lot of memory > -------------------------------------------------- > > Key: KAFKA-8497 > URL: https://issues.apache.org/jira/browse/KAFKA-8497 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: sunqing > Priority: Major > > > A simple kafka streams application, use KStream to consume data, as below。 > Memory usage is very high when there is a large amount of data under the > consuming topic. > Sometimes it goes up to 20G. > This is very strange. The program doesn't do anything. It just reads the data > and prints it to the screen. Why is the memory usage so high when there is a > lot of data in the topic? > > > > The program code: > > public class TestMain { > public static StreamsBuilder builder = new StreamsBuilder(); > public static void kafkaStreamStart() { > KStream<String, String> stream = > builder.stream(Arrays.asList("wk_wangxin_po")); > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); > props.setProperty("security.protocol", "SASL_PLAINTEXT"); > props.setProperty("sasl.mechanism", "PLAIN"); > props.setProperty("sasl.kerberos.service.name", "kafka"); > System.setProperty("java.security.auth.login.config", > "./conf/kafka_client_jaas.conf"); > stream.foreach(new ForeachAction<String, String>() { > @Override > public void apply(String key, String value) > { System.out.println("===="); System.out.println(key); } > }); > Topology topo = builder.build(); > KafkaStreams streams = new KafkaStreams(topo, props); > streams.start(); > } > public static void main(String[] args) > { kafkaStreamStart(); } > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)