[GitHub] sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-18 Thread GitBox
sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257866302
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -80,17 +88,46 @@ public KafkaKeyValueProducerPusher(String brokers, String 
topic) {
*/
   public void pushMessages(List> messages) {
 for (Pair message: messages) {
-  this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+  this.futures.offer(this.producer.send(new ProducerRecord<>(topic, 
message.getKey(), message.getValue()), (recordMetadata, e) -> {
 if (e != null) {
   log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
 }
-  });
+  }));
+}
+
+//Accumulate futures returned from send() into a buffer; will be used to 
simulate flush by calling get() on
+// each of the accumulated futures.
+if (this.futures.size() >= MAX_NUM_FUTURES_TO_BUFFER) {
+  flush(MAX_NUM_FUTURES_TO_BUFFER);
+  this.futures.clear();
 
 Review comment:
   Good catch! Yes, the clear() is not needed. Also, changed the pushMessages() 
method to exclude the newest added messages. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-18 Thread GitBox
sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257866178
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -47,6 +51,9 @@
   private final String topic;
   private final KafkaProducer producer;
   private final Closer closer;
+  private final Queue> futures = new 
LinkedBlockingDeque<>();
 
 Review comment:
   Thanks! Added more comments to explain how to set the capacity for the 
buffer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services