You can collect the Futures and call `get` in batches. That would give you access to the errors without blocking on each request.
Ismael On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <phad...@sky.optymyze.com> wrote: > Hello, > > We had tried that... If future.get() is added in the while loop, it takes > too long for the loop to execute. > > Last time we tried it, it was running for that file for over 2 hours and > still not finished. > > Regards, > Varun > > -----Original Message----- > From: Jaikiran Pai [mailto:jai.forums2...@gmail.com] > Sent: 22 November 2016 02:20 > To: users@kafka.apache.org > Subject: Re: Kafka producer dropping records > > The KafkaProducer.send returns a Future<RecordMetadata>. What happens when > you add a future.get() on the returned Future, in that while loop, for each > sent record? > > -Jaikiran > > On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote: > > Hello, > > > > We have the following piece of code where we read lines from a file and > push them to a Kafka topic : > > > > Properties properties = new Properties(); > > properties.put("bootstrap.servers", <bootstrapServers>); > > properties.put("key.serializer", StringSerializer.class.getCano > nicalName()); > > properties.put("value.serializer", > StringSerializer.class.getCanonicalName()); > > properties.put("retries",100); > > properties.put("acks", "all"); > > > > KafkaProducer<Object, String> producer = new > > KafkaProducer<>(properties); > > > > try (BufferedReader bf = new BufferedReader(new > InputStreamReader(new FileInputStream(filePath), "UTF-8"))) { > > String line; > > int count = 0; > > while ((line = bf.readLine()) != null) { > > count++; > > producer.send(new ProducerRecord<>(topicName, line)); > > } > > Logger.log("Done producing data messages. Total no of > records produced:" + count); > > } catch (InterruptedException | ExecutionException | > IOException e) { > > Throwables.propagate(e); > > } finally { > > producer.close(); > > } > > > > When we try this with a large file with a million records, only half of > them around 500,000 get written to the topic. In the above example, I > verified this by running the GetOffset tool after fair amount of time (to > ensure all records had finished processing) as follows: > > > > > > ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > > <broker_list> --time -1 --topic <topic_name> > > > > > > > > > > The output of this was : > > > > > > topic_name:1:292954 > > > > topic_name:0:296787 > > > > > > What could be causing this dropping of records? > > > > Thanks, > > Varun > > > >