This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e1ad269 Fixed race condition between write operation and send timeout (#1108) e1ad269 is described below commit e1ad269f32334a274565c95f0745468d58a2720b Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jan 24 18:19:24 2018 -0800 Fixed race condition between write operation and send timeout (#1108) --- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 1124658..32d61a5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum; import static org.apache.pulsar.common.api.Commands.hasChecksum; @@ -440,14 +441,16 @@ public class ProducerImpl extends ProducerBase implements TimerTask { private static final class WriteInEventLoopCallback implements Runnable { private ProducerImpl producer; + private ByteBufPair cmd; + private long sequenceId; private ClientCnx cnx; - private OpSendMsg op; static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) { WriteInEventLoopCallback c = RECYCLER.get(); c.producer = producer; c.cnx = cnx; - c.op = op; + c.sequenceId = op.sequenceId; + c.cmd = op.cmd; return c; } @@ -455,11 +458,11 @@ public class ProducerImpl extends ProducerBase implements TimerTask { public void run() { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx, - op.sequenceId); + sequenceId); } try { - cnx.ctx().writeAndFlush(op.cmd, cnx.ctx().voidPromise()); + cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); } finally { recycle(); } @@ -468,7 +471,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask { private void recycle() { producer = null; cnx = null; - op = null; + cmd = null; + sequenceId = -1; recyclerHandle.recycle(this); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.