[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ This closes #1534
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b01a890 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b01a890 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b01a890 Branch: refs/heads/master Commit: 6b01a89020f2de3f7710cf72336291b1e8ca8562 Parents: d97fcda Author: Robert Metzger <rmetz...@apache.org> Authored: Thu Jan 21 12:22:21 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Jan 28 14:43:03 2016 +0100 ---------------------------------------------------------------------- .../connectors/rabbitmq/RMQSource.java | 4 +- .../connectors/rabbitmq/RMQSourceTest.java | 79 ++++++++++++++++++++ .../source/MessageAcknowledgingSourceBase.java | 51 +++++++------ ...ltipleIdsMessageAcknowledgingSourceBase.java | 24 +++--- 4 files changed, 124 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 09bb07c..59bc057 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -196,7 +196,9 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU continue; } } - sessionIds.add(deliveryTag); + synchronized (sessionIdsPerSnapshot) { + sessionIds.add(deliveryTag); + } } ctx.collect(result); http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index aa19e5d..0a3de84 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -23,6 +23,7 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; @@ -31,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -103,6 +105,83 @@ public class RMQSourceTest { sourceThread.join(); } + /** + * Make sure concurrent access to snapshotState() and notifyCheckpointComplete() don't cause + * an issue. + * + * Without proper synchronization, the test will fail with a concurrent modification exception + * + */ + @Test + public void testConcurrentAccess() throws Exception { + source.autoAck = false; + sourceThread.start(); + + final Tuple1<Throwable> error = new Tuple1<>(null); + + Thread.sleep(5); + + Thread snapshotThread = new Thread(new Runnable() { + public long id = 0; + + @Override + public void run() { + while (!Thread.interrupted()) { + try { + source.snapshotState(id++, 0); + } catch (Exception e) { + error.f0 = e; + break; // stop thread + } + } + } + }); + + Thread notifyThread = new Thread(new Runnable() { + @Override + public void run() { + while (!Thread.interrupted()) { + try { + // always remove all checkpoints + source.notifyCheckpointComplete(Long.MAX_VALUE); + } catch (Exception e) { + error.f0 = e; + break; // stop thread + } + } + } + }); + + snapshotThread.start(); + notifyThread.start(); + + long deadline = System.currentTimeMillis() + 1000L; + while(System.currentTimeMillis() < deadline) { + if(!snapshotThread.isAlive()) { + notifyThread.interrupt(); + break; + } + if(!notifyThread.isAlive()) { + snapshotThread.interrupt(); + break; + } + Thread.sleep(10); + } + if(snapshotThread.isAlive()) { + snapshotThread.interrupt(); + snapshotThread.join(); + } + if(notifyThread.isAlive()) { + notifyThread.interrupt(); + notifyThread.join(); + } + if(error.f0 != null) { + error.f0.printStackTrace(); + Assert.fail("Test failed with " + error.f0.getClass().getCanonicalName()); + } + + } + @Test public void testCheckpointing() throws Exception { source.autoAck = false; http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 4385884..2f865d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory; * @param <Type> The type of the messages created by the source. * @param <UId> The type of unique IDs which may be used to acknowledge elements. */ +@SuppressWarnings("SynchronizeOnNonFinalField") public abstract class MessageAcknowledgingSourceBase<Type, UId> extends RichSourceFunction<Type> implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier { @@ -166,41 +167,45 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}", idsForCurrentCheckpoint, checkpointId, checkpointTimestamp); - pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint)); + synchronized (pendingCheckpoints) { + pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint)); - idsForCurrentCheckpoint = new ArrayList<>(64); + idsForCurrentCheckpoint = new ArrayList<>(64); - return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer); + return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer); + } } @Override public void restoreState(SerializedCheckpointData[] state) throws Exception { - pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); - // build a set which contains all processed ids. It may be used to check if we have - // already processed an incoming message. - for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) { - idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + synchronized (pendingCheckpoints) { + pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer); + // build a set which contains all processed ids. It may be used to check if we have + // already processed an incoming message. + for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) { + idsProcessedButNotAcknowledged.addAll(checkpoint.f1); + } } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Committing Messages externally for checkpoint {}", checkpointId); - - for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) { - Tuple2<Long, List<UId>> checkpoint = iter.next(); - long id = checkpoint.f0; - - if (id <= checkpointId) { - LOG.trace("Committing Messages with following IDs {}", checkpoint.f1); - acknowledgeIDs(checkpointId, checkpoint.f1); - // remove deduplication data - idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); - // remove checkpoint data - iter.remove(); - } - else { - break; + synchronized (pendingCheckpoints) { + for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext(); ) { + Tuple2<Long, List<UId>> checkpoint = iter.next(); + long id = checkpoint.f0; + + if (id <= checkpointId) { + LOG.trace("Committing Messages with following IDs {}", checkpoint.f1); + acknowledgeIDs(checkpointId, checkpoint.f1); + // remove deduplication data + idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); + // remove checkpoint data + iter.remove(); + } else { + break; + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java index c097066..4709759 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java @@ -107,14 +107,16 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi */ protected final void acknowledgeIDs(long checkpointId, List<UId> uniqueIds) { LOG.debug("Acknowledging ids for checkpoint {}", checkpointId); - Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator(); - while (iterator.hasNext()) { - final Tuple2<Long, List<SessionId>> next = iterator.next(); - long id = next.f0; - if (id <= checkpointId) { - acknowledgeSessionIDs(next.f1); - // remove ids for this session - iterator.remove(); + synchronized (sessionIdsPerSnapshot) { + Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator(); + while (iterator.hasNext()) { + final Tuple2<Long, List<SessionId>> next = iterator.next(); + long id = next.f0; + if (id <= checkpointId) { + acknowledgeSessionIDs(next.f1); + // remove ids for this session + iterator.remove(); + } } } } @@ -132,8 +134,10 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi @Override public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds)); - sessionIds = new ArrayList<>(64); + synchronized (sessionIdsPerSnapshot) { + sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds)); + sessionIds = new ArrayList<>(64); + } return super.snapshotState(checkpointId, checkpointTimestamp); } }