karthik created KAFKA-13339: ------------------------------- Summary: Kstream not fetch all the messages Key: KAFKA-13339 URL: https://issues.apache.org/jira/browse/KAFKA-13339 Project: Kafka Issue Type: New Feature Components: consumer Reporter: karthik
i used the below Kstream code for fetch all the records from my topic and facing below error. code : Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers1); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<Integer, String> kStream = streamsBuilder.stream(AppConfigs.topicName); kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v)); //kStream.peek((k,v)-> System.out.println("Key= " + k + " Value= " + v)); Topology topology = streamsBuilder.build(); KafkaStreams streams = new KafkaStreams(topology, props); System.out.println("Starting stream."); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Shutting down stream"); streams.close(); })); *Error :* DEBUG o.a.k.clients.FetchSessionHandler - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=MESProducer] Built incremental fetch (sessionId=797093970, epoch=417) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s) 18:22:16.088 [Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1] DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer, groupId=Producer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(pi-hist-topic-0)) to broker -- This message was sent by Atlassian Jira (v8.3.4#803005)