nlu90 closed pull request #2692: Making emit, ack, and fail thread safe URL: https://github.com/apache/incubator-heron/pull/2692
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java index 858773994f..5fb74be7ee 100644 --- a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java +++ b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -40,6 +41,7 @@ private long totalTuplesEmitted; private long totalBytesEmitted; private PhysicalPlanHelper helper; + public final ReentrantLock lock = new ReentrantLock(); /** * The SuppressWarnings is only until TOPOLOGY_ENABLE_ACKING exists. @@ -73,7 +75,7 @@ public AbstractOutputCollector(IPluggableSerializer serializer, } } - this.outputter = new OutgoingTupleCollection(helper, streamOutQueue); + this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock); } public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) { diff --git a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java index a346c4b95c..ae9e31a4fb 100644 --- a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java +++ b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java @@ -15,6 +15,8 @@ package org.apache.heron.instance; import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -55,14 +57,17 @@ private HeronTuples.HeronControlTupleSet.Builder currentControlTuple; // Total data emitted in bytes for the entire life - private long totalDataEmittedInBytes; + private AtomicLong totalDataEmittedInBytes = new AtomicLong(); // Current size in bytes for data types to pack into the HeronTupleSet private long currentDataTupleSizeInBytes; + private final ReentrantLock lock; + public OutgoingTupleCollection( PhysicalPlanHelper helper, - Communicator<Message> outQueue) { + Communicator<Message> outQueue, + ReentrantLock lock) { this.outQueue = outQueue; this.helper = helper; SystemConfig systemConfig = @@ -72,17 +77,23 @@ public OutgoingTupleCollection( SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig()); // Initialize the values in constructor - this.totalDataEmittedInBytes = 0; + this.totalDataEmittedInBytes.set(0); this.currentDataTupleSizeInBytes = 0; // Read the config values this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity(); this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize(); this.controlTupleSetCapacity = systemConfig.getInstanceSetControlTupleCapacity(); + this.lock = lock; } public void sendOutTuples() { - flushRemaining(); + lock.lock(); + try { + flushRemaining(); + } finally { + lock.unlock(); + } } /** @@ -92,71 +103,93 @@ public void sendOutTuples() { */ public void sendOutState(State<Serializable, Serializable> state, String checkpointId) { - // flush all the current data before sending the state - flushRemaining(); - - // Serialize the state - byte[] serializedState = serializer.serialize(state); - - // Construct the instance state checkpoint - CheckpointManager.InstanceStateCheckpoint instanceState = - CheckpointManager.InstanceStateCheckpoint.newBuilder() - .setCheckpointId(checkpointId) - .setState(ByteString.copyFrom(serializedState)) - .build(); - - CheckpointManager.StoreInstanceStateCheckpoint storeRequest = - CheckpointManager.StoreInstanceStateCheckpoint.newBuilder() - .setState(instanceState) - .build(); - - // Put the checkpoint to out stream queue - outQueue.offer(storeRequest); + lock.lock(); + try { + // flush all the current data before sending the state + flushRemaining(); + + // Serialize the state + byte[] serializedState = serializer.serialize(state); + + // Construct the instance state checkpoint + CheckpointManager.InstanceStateCheckpoint instanceState = + CheckpointManager.InstanceStateCheckpoint.newBuilder() + .setCheckpointId(checkpointId) + .setState(ByteString.copyFrom(serializedState)) + .build(); + + CheckpointManager.StoreInstanceStateCheckpoint storeRequest = + CheckpointManager.StoreInstanceStateCheckpoint.newBuilder() + .setState(instanceState) + .build(); + + // Put the checkpoint to out stream queue + outQueue.offer(storeRequest); + } finally { + lock.unlock(); + } } public void addDataTuple( String streamId, HeronTuples.HeronDataTuple.Builder newTuple, long tupleSizeInBytes) { - if (tupleSizeInBytes > maxDataTupleSize.asBytes()) { - throw new RuntimeException( - String.format("Data tuple (stream id: %s) is too large: %d bytes", streamId, - tupleSizeInBytes)); + lock.lock(); + try { + if (tupleSizeInBytes > maxDataTupleSize.asBytes()) { + throw new RuntimeException( + String.format("Data tuple (stream id: %s) is too large: %d bytes", streamId, + tupleSizeInBytes)); + } + if (currentDataTuple == null + || !currentDataTuple.getStream().getId().equals(streamId) + || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity + || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) { + initNewDataTuple(streamId); + } + currentDataTuple.addTuples(newTuple); + + currentDataTupleSizeInBytes += tupleSizeInBytes; + totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes); + } finally { + lock.unlock(); } - if (currentDataTuple == null - || !currentDataTuple.getStream().getId().equals(streamId) - || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity - || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) { - initNewDataTuple(streamId); - } - currentDataTuple.addTuples(newTuple); - - currentDataTupleSizeInBytes += tupleSizeInBytes; - totalDataEmittedInBytes += tupleSizeInBytes; } - public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) { - if (currentControlTuple == null - || currentControlTuple.getFailsCount() > 0 - || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) { - initNewControlTuple(); + public void addAckTuple( + HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) { + lock.lock(); + try { + if (currentControlTuple == null + || currentControlTuple.getFailsCount() > 0 + || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) { + initNewControlTuple(); + } + currentControlTuple.addAcks(newTuple); + + // Add the size of data in bytes ready to send out + totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes); + } finally { + lock.unlock(); } - currentControlTuple.addAcks(newTuple); - - // Add the size of data in bytes ready to send out - totalDataEmittedInBytes += tupleSizeInBytes; } - public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) { - if (currentControlTuple == null - || currentControlTuple.getAcksCount() > 0 - || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) { - initNewControlTuple(); + public void addFailTuple( + HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) { + lock.lock(); + try { + if (currentControlTuple == null + || currentControlTuple.getAcksCount() > 0 + || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) { + initNewControlTuple(); + } + currentControlTuple.addFails(newTuple); + + // Add the size of data in bytes ready to send out + totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes); + } finally { + lock.unlock(); } - currentControlTuple.addFails(newTuple); - - // Add the size of data in bytes ready to send out - totalDataEmittedInBytes += tupleSizeInBytes; } private void initNewDataTuple(String streamId) { @@ -210,18 +243,28 @@ public boolean isOutQueuesAvailable() { } public long getTotalDataEmittedInBytes() { - return totalDataEmittedInBytes; + return totalDataEmittedInBytes.get(); } // Clean the internal state of OutgoingTupleCollection public void clear() { - currentControlTuple = null; - currentDataTuple = null; + lock.lock(); + try { + currentControlTuple = null; + currentDataTuple = null; - outQueue.clear(); + outQueue.clear(); + } finally { + lock.unlock(); + } } public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) { - this.helper = physicalPlanHelper; + lock.lock(); + try { + this.helper = physicalPlanHelper; + } finally { + lock.unlock(); + } } } diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java index b141c3bf00..41ed7fcdbf 100644 --- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java +++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java @@ -139,12 +139,19 @@ public void persistState(String checkpointId) { throw new RuntimeException("Could not save a non-stateful topology's state"); } - // Checkpoint - if (bolt instanceof IStatefulComponent) { - ((IStatefulComponent) bolt).preSave(checkpointId); - } + // need to synchronize with other OutgoingTupleCollection operations + // so that topology emit, ack, fail are thread safe + collector.lock.lock(); + try { + // Checkpoint + if (bolt instanceof IStatefulComponent) { + ((IStatefulComponent) bolt).preSave(checkpointId); + } - collector.sendOutState(instanceState, checkpointId); + collector.sendOutState(instanceState, checkpointId); + } finally { + collector.lock.unlock(); + } } @SuppressWarnings("unchecked") diff --git a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java index b20418c953..3834a47428 100644 --- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java +++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java @@ -148,11 +148,18 @@ public void persistState(String checkpointId) { throw new RuntimeException("Could not save a non-stateful topology's state"); } - if (spout instanceof IStatefulComponent) { - ((IStatefulComponent) spout).preSave(checkpointId); - } + // need to synchronize with other OutgoingTupleCollection operations + // so that topology emit, ack, fail are thread safe + collector.lock.lock(); + try { + if (spout instanceof IStatefulComponent) { + ((IStatefulComponent) spout).preSave(checkpointId); + } - collector.sendOutState(instanceState, checkpointId); + collector.sendOutState(instanceState, checkpointId); + } finally { + collector.lock.unlock(); + } } @SuppressWarnings("unchecked") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services