sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257866302
##
File path:
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
##
@@ -80,17 +88,46 @@ public KafkaKeyValueProducerPusher(String brokers, String
topic) {
*/
public void pushMessages(List> messages) {
for (Pair message: messages) {
- this.producer.send(new ProducerRecord<>(topic, message.getKey(),
message.getValue()), (recordMetadata, e) -> {
+ this.futures.offer(this.producer.send(new ProducerRecord<>(topic,
message.getKey(), message.getValue()), (recordMetadata, e) -> {
if (e != null) {
log.error("Failed to send message to topic {} due to exception: ",
topic, e);
}
- });
+ }));
+}
+
+//Accumulate futures returned from send() into a buffer; will be used to
simulate flush by calling get() on
+// each of the accumulated futures.
+if (this.futures.size() >= MAX_NUM_FUTURES_TO_BUFFER) {
+ flush(MAX_NUM_FUTURES_TO_BUFFER);
+ this.futures.clear();
Review comment:
Good catch! Yes, the clear() is not needed. Also, changed the pushMessages()
method to exclude the newest added messages.
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
With regards,
Apache Git Services