[
https://issues.apache.org/jira/browse/KAFKA-8497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857357#comment-16857357
]
Bruno Cadonna commented on KAFKA-8497:
--------------------------------------
Hi [~dongni98]
Could you please use English for Jira issues (and in all other communication
channels)?
Using English makes it much easier for us to react on your messages.
> kafka streams application占用内存很高
> -------------------------------
>
> Key: KAFKA-8497
> URL: https://issues.apache.org/jira/browse/KAFKA-8497
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: sunqing
> Priority: Major
>
> 一个简单的kafka streams测试应用,使用KStream来消费数据,当所消费的kafka
> Topic中的数据暴涨时,或者要消费的Topic中待消费数据量很大时,消费程序占用的内存会非常高,能达到20多G,
> 疑问:kafka streams不是逐条消费吗,为啥topic中的数据量很大时会导致程序内存飙升
>
> 测试程序代码如下:
>
> 代码如下:
> public class RunMain {
> public static StreamsBuilder builder = new StreamsBuilder();
> public static void kafkaStreamStart() {
> KStream<String, String> stream =
> builder.stream(Arrays.asList("wk_wangxin_po"));
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testwang_xin");
>
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>
> "zktj-kafka-broker-out-1:29092,zktj-kafka-broker-out-2:29092,zktj-kafka-broker-out-3:29092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
> props.setProperty("security.protocol", "SASL_PLAINTEXT");
> props.setProperty("sasl.mechanism", "PLAIN");
> props.setProperty("sasl.kerberos.service.name", "kafka");
> System.setProperty("java.security.auth.login.config",
> "./conf/kafka_client_jaas.conf");
>
> stream.foreach(new ForeachAction<String, String>() {
> @Override
> public void apply(String key, String value) {
> System.out.println("====");
> System.out.println(key);
> }
> });
> Topology topo = builder.build();
> KafkaStreams streams = new KafkaStreams(topo, props);
> streams.start();
> }
> public static void main(String[] args) {
> kafkaStreamStart();
> }
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)