[ 
https://issues.apache.org/jira/browse/KAFKA-13339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424304#comment-17424304
 ] 

Matthias J. Sax commented on KAFKA-13339:
-----------------------------------------

The logs you shared don't contain any errors... It's just regular DEBUG logs.

Also note, that we use Jira only for actual bug reports. If you have question, 
please reach out to the user mailing list instead: 
[https://kafka.apache.org/contact]

Closing this ticket as invalid. We can re-open if there is really a bug.

> Kstream not fetch all the messages
> ----------------------------------
>
>                 Key: KAFKA-13339
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13339
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: karthik
>            Priority: Major
>
> i used the below Kstream code for fetch all the records from my topic and 
> facing below error.
>  
> code :
>  
> Properties props = new Properties();
>  props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
>  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> AppConfigs.bootstrapServers1);
>  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.Integer().getClass());
>  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> StreamsBuilder streamsBuilder = new StreamsBuilder();
>  KStream<Integer, String> kStream = 
> streamsBuilder.stream(AppConfigs.topicName);
>  kStream.foreach((k, v) -> System.out.println("Key= " + k + " Value= " + v));
>  //kStream.peek((k,v)-> System.out.println("Key= " + k + " Value= " + v));
> Topology topology = streamsBuilder.build();
>  KafkaStreams streams = new KafkaStreams(topology, props);
>  System.out.println("Starting stream.");
>  streams.start();
> Runtime.getRuntime().addShutdownHook(new Thread(() ->
> { System.out.println("Shutting down stream"); streams.close(); }
> ));
>  
> *Error :*
> DEBUG o.a.k.clients.FetchSessionHandler - [Consumer 
> clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer,
>  groupId=MESProducer] Built incremental fetch (sessionId=797093970, 
> epoch=417) for node 2. Added 0 partition(s), altered 0 partition(s), removed 
> 0 partition(s) out of 1 partition(s)
>  18:22:16.088 [Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1] 
> DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer 
> clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer,
>  groupId=Producer] Sending READ_UNCOMMITTED 
> IncrementalFetchRequest(toSend=(), toForget=(), implied=(test-topic-0)) to 
> broker
> DEBUG o.a.k.c.consumer.internals.Fetcher - [Consumer 
> clientId=Producer-378ac0f4-7433-4c9b-b814-489fafbf86ac-StreamThread-1-consumer,
>  groupId=MESProducer] Added READ_UNCOMMITTED fetch request for partition 
> test-topic-1 at position FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to