You can collect the Futures and call `get` in batches. That would give you
access to the errors without blocking on each request.

Ismael

On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <phad...@sky.optymyze.com>
wrote:

> Hello,
>
> We had tried that... If future.get() is added in the while loop, it takes
> too long for the loop to execute.
>
> Last time we tried it, it was running for that file for over 2 hours and
> still not finished.
>
> Regards,
> Varun
>
> -----Original Message-----
> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
> Sent: 22 November 2016 02:20
> To: users@kafka.apache.org
> Subject: Re: Kafka producer dropping records
>
> The KafkaProducer.send returns a Future<RecordMetadata>. What happens when
> you add a future.get() on the returned Future, in that while loop, for each
> sent record?
>
> -Jaikiran
>
> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> > Hello,
> >
> > We have the following piece of code where we read lines from a file and
> push them to a Kafka topic :
> >
> >          Properties properties = new Properties();
> >          properties.put("bootstrap.servers", <bootstrapServers>);
> >          properties.put("key.serializer", StringSerializer.class.getCano
> nicalName());
> >          properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
> >          properties.put("retries",100);
> >         properties.put("acks", "all");
> >
> >         KafkaProducer<Object, String> producer =  new
> > KafkaProducer<>(properties);
> >
> >          try (BufferedReader bf = new BufferedReader(new
> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >              String line;
> >              int count = 0;
> >              while ((line = bf.readLine()) != null) {
> >                  count++;
> >                  producer.send(new ProducerRecord<>(topicName, line));
> >              }
> >              Logger.log("Done producing data messages. Total no of
> records produced:" + count);
> >          } catch (InterruptedException | ExecutionException |
> IOException e) {
> >              Throwables.propagate(e);
> >          } finally {
> >              producer.close();
> >          }
> >
> > When we try this with a large file with a million records, only half of
> them around 500,000 get written to the topic. In the above example, I
> verified this by running the GetOffset tool after fair amount of time (to
> ensure all records had finished processing) as follows:
> >
> >
> >          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > <broker_list> --time -1 --topic <topic_name>
> >
> >
> >
> >
> > The output of this was :
> >
> >
> >          topic_name:1:292954
> >
> >          topic_name:0:296787
> >
> >
> > What could be causing this dropping of records?
> >
> > Thanks,
> > Varun
> >
>
>

Reply via email to