nlu90 closed pull request #2692: Making emit, ack, and fail thread safe
URL: https://github.com/apache/incubator-heron/pull/2692
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
 
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
index 858773994f..5fb74be7ee 100644
--- 
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
+++ 
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
@@ -17,6 +17,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -40,6 +41,7 @@
   private long totalTuplesEmitted;
   private long totalBytesEmitted;
   private PhysicalPlanHelper helper;
+  public final ReentrantLock lock = new ReentrantLock();
 
   /**
    * The SuppressWarnings is only until TOPOLOGY_ENABLE_ACKING exists.
@@ -73,7 +75,7 @@ public AbstractOutputCollector(IPluggableSerializer 
serializer,
       }
     }
 
-    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue);
+    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock);
   }
 
   public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
 
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
index a346c4b95c..ae9e31a4fb 100644
--- 
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
+++ 
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
@@ -15,6 +15,8 @@
 package org.apache.heron.instance;
 
 import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -55,14 +57,17 @@
   private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
 
   // Total data emitted in bytes for the entire life
-  private long totalDataEmittedInBytes;
+  private AtomicLong totalDataEmittedInBytes = new AtomicLong();
 
   // Current size in bytes for data types to pack into the HeronTupleSet
   private long currentDataTupleSizeInBytes;
 
+  private final ReentrantLock lock;
+
   public OutgoingTupleCollection(
       PhysicalPlanHelper helper,
-      Communicator<Message> outQueue) {
+      Communicator<Message> outQueue,
+      ReentrantLock lock) {
     this.outQueue = outQueue;
     this.helper = helper;
     SystemConfig systemConfig =
@@ -72,17 +77,23 @@ public OutgoingTupleCollection(
         
SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig());
 
     // Initialize the values in constructor
-    this.totalDataEmittedInBytes = 0;
+    this.totalDataEmittedInBytes.set(0);
     this.currentDataTupleSizeInBytes = 0;
 
     // Read the config values
     this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
     this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize();
     this.controlTupleSetCapacity = 
systemConfig.getInstanceSetControlTupleCapacity();
+    this.lock = lock;
   }
 
   public void sendOutTuples() {
-    flushRemaining();
+    lock.lock();
+    try {
+      flushRemaining();
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -92,71 +103,93 @@ public void sendOutTuples() {
    */
   public void sendOutState(State<Serializable, Serializable> state,
                            String checkpointId) {
-    // flush all the current data before sending the state
-    flushRemaining();
-
-    // Serialize the state
-    byte[] serializedState = serializer.serialize(state);
-
-    // Construct the instance state checkpoint
-    CheckpointManager.InstanceStateCheckpoint instanceState =
-        CheckpointManager.InstanceStateCheckpoint.newBuilder()
-          .setCheckpointId(checkpointId)
-          .setState(ByteString.copyFrom(serializedState))
-          .build();
-
-    CheckpointManager.StoreInstanceStateCheckpoint storeRequest =
-        CheckpointManager.StoreInstanceStateCheckpoint.newBuilder()
-            .setState(instanceState)
-            .build();
-
-    // Put the checkpoint to out stream queue
-    outQueue.offer(storeRequest);
+    lock.lock();
+    try {
+      // flush all the current data before sending the state
+      flushRemaining();
+
+      // Serialize the state
+      byte[] serializedState = serializer.serialize(state);
+
+      // Construct the instance state checkpoint
+      CheckpointManager.InstanceStateCheckpoint instanceState =
+          CheckpointManager.InstanceStateCheckpoint.newBuilder()
+              .setCheckpointId(checkpointId)
+              .setState(ByteString.copyFrom(serializedState))
+              .build();
+
+      CheckpointManager.StoreInstanceStateCheckpoint storeRequest =
+          CheckpointManager.StoreInstanceStateCheckpoint.newBuilder()
+              .setState(instanceState)
+              .build();
+
+      // Put the checkpoint to out stream queue
+      outQueue.offer(storeRequest);
+    } finally {
+      lock.unlock();
+    }
   }
 
   public void addDataTuple(
       String streamId,
       HeronTuples.HeronDataTuple.Builder newTuple,
       long tupleSizeInBytes) {
-    if (tupleSizeInBytes > maxDataTupleSize.asBytes()) {
-      throw new RuntimeException(
-          String.format("Data tuple (stream id: %s) is too large: %d bytes", 
streamId,
-              tupleSizeInBytes));
+    lock.lock();
+    try {
+      if (tupleSizeInBytes > maxDataTupleSize.asBytes()) {
+        throw new RuntimeException(
+            String.format("Data tuple (stream id: %s) is too large: %d bytes", 
streamId,
+                tupleSizeInBytes));
+      }
+      if (currentDataTuple == null
+          || !currentDataTuple.getStream().getId().equals(streamId)
+          || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
+          || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
+        initNewDataTuple(streamId);
+      }
+      currentDataTuple.addTuples(newTuple);
+
+      currentDataTupleSizeInBytes += tupleSizeInBytes;
+      totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+    } finally {
+      lock.unlock();
     }
-    if (currentDataTuple == null
-        || !currentDataTuple.getStream().getId().equals(streamId)
-        || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
-        || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
-      initNewDataTuple(streamId);
-    }
-    currentDataTuple.addTuples(newTuple);
-
-    currentDataTupleSizeInBytes += tupleSizeInBytes;
-    totalDataEmittedInBytes += tupleSizeInBytes;
   }
 
-  public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long 
tupleSizeInBytes) {
-    if (currentControlTuple == null
-        || currentControlTuple.getFailsCount() > 0
-        || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
-      initNewControlTuple();
+  public void addAckTuple(
+      HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
+    lock.lock();
+    try {
+      if (currentControlTuple == null
+          || currentControlTuple.getFailsCount() > 0
+          || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
+        initNewControlTuple();
+      }
+      currentControlTuple.addAcks(newTuple);
+
+      // Add the size of data in bytes ready to send out
+      totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+    } finally {
+      lock.unlock();
     }
-    currentControlTuple.addAcks(newTuple);
-
-    // Add the size of data in bytes ready to send out
-    totalDataEmittedInBytes += tupleSizeInBytes;
   }
 
-  public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long 
tupleSizeInBytes) {
-    if (currentControlTuple == null
-        || currentControlTuple.getAcksCount() > 0
-        || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
-      initNewControlTuple();
+  public void addFailTuple(
+      HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
+    lock.lock();
+    try {
+      if (currentControlTuple == null
+          || currentControlTuple.getAcksCount() > 0
+          || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
+        initNewControlTuple();
+      }
+      currentControlTuple.addFails(newTuple);
+
+      // Add the size of data in bytes ready to send out
+      totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+    } finally {
+      lock.unlock();
     }
-    currentControlTuple.addFails(newTuple);
-
-    // Add the size of data in bytes ready to send out
-    totalDataEmittedInBytes += tupleSizeInBytes;
   }
 
   private void initNewDataTuple(String streamId) {
@@ -210,18 +243,28 @@ public boolean isOutQueuesAvailable() {
   }
 
   public long getTotalDataEmittedInBytes() {
-    return totalDataEmittedInBytes;
+    return totalDataEmittedInBytes.get();
   }
 
   // Clean the internal state of OutgoingTupleCollection
   public void clear() {
-    currentControlTuple = null;
-    currentDataTuple = null;
+    lock.lock();
+    try {
+      currentControlTuple = null;
+      currentDataTuple = null;
 
-    outQueue.clear();
+      outQueue.clear();
+    } finally {
+      lock.unlock();
+    }
   }
 
   public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
-    this.helper = physicalPlanHelper;
+    lock.lock();
+    try {
+      this.helper = physicalPlanHelper;
+    } finally {
+      lock.unlock();
+    }
   }
 }
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index b141c3bf00..41ed7fcdbf 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -139,12 +139,19 @@ public void persistState(String checkpointId) {
       throw new RuntimeException("Could not save a non-stateful topology's 
state");
     }
 
-    // Checkpoint
-    if (bolt instanceof IStatefulComponent) {
-      ((IStatefulComponent) bolt).preSave(checkpointId);
-    }
+    // need to synchronize with other OutgoingTupleCollection operations
+    // so that topology emit, ack, fail are thread safe
+    collector.lock.lock();
+    try {
+      // Checkpoint
+      if (bolt instanceof IStatefulComponent) {
+        ((IStatefulComponent) bolt).preSave(checkpointId);
+      }
 
-    collector.sendOutState(instanceState, checkpointId);
+      collector.sendOutState(instanceState, checkpointId);
+    } finally {
+      collector.lock.unlock();
+    }
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index b20418c953..3834a47428 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -148,11 +148,18 @@ public void persistState(String checkpointId) {
       throw new RuntimeException("Could not save a non-stateful topology's 
state");
     }
 
-    if (spout instanceof IStatefulComponent) {
-      ((IStatefulComponent) spout).preSave(checkpointId);
-    }
+    // need to synchronize with other OutgoingTupleCollection operations
+    // so that topology emit, ack, fail are thread safe
+    collector.lock.lock();
+    try {
+      if (spout instanceof IStatefulComponent) {
+        ((IStatefulComponent) spout).preSave(checkpointId);
+      }
 
-    collector.sendOutState(instanceState, checkpointId);
+      collector.sendOutState(instanceState, checkpointId);
+    } finally {
+      collector.lock.unlock();
+    }
   }
 
   @SuppressWarnings("unchecked")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to