This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e1ad269  Fixed race condition between write operation and send timeout 
(#1108)
e1ad269 is described below

commit e1ad269f32334a274565c95f0745468d58a2720b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jan 24 18:19:24 2018 -0800

    Fixed race condition between write operation and send timeout (#1108)
---
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java   | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1124658..32d61a5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.String.format;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
@@ -440,14 +441,16 @@ public class ProducerImpl extends ProducerBase implements 
TimerTask {
 
     private static final class WriteInEventLoopCallback implements Runnable {
         private ProducerImpl producer;
+        private ByteBufPair cmd;
+        private long sequenceId;
         private ClientCnx cnx;
-        private OpSendMsg op;
 
         static WriteInEventLoopCallback create(ProducerImpl producer, 
ClientCnx cnx, OpSendMsg op) {
             WriteInEventLoopCallback c = RECYCLER.get();
             c.producer = producer;
             c.cnx = cnx;
-            c.op = op;
+            c.sequenceId = op.sequenceId;
+            c.cmd = op.cmd;
             return c;
         }
 
@@ -455,11 +458,11 @@ public class ProducerImpl extends ProducerBase implements 
TimerTask {
         public void run() {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", 
producer.topic, producer.producerName, cnx,
-                        op.sequenceId);
+                        sequenceId);
             }
 
             try {
-                cnx.ctx().writeAndFlush(op.cmd, cnx.ctx().voidPromise());
+                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
             } finally {
                 recycle();
             }
@@ -468,7 +471,8 @@ public class ProducerImpl extends ProducerBase implements 
TimerTask {
         private void recycle() {
             producer = null;
             cnx = null;
-            op = null;
+            cmd = null;
+            sequenceId = -1;
             recyclerHandle.recycle(this);
         }
 

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to