[FLINK-3265][tests] adapt RMQSource checkpointing test to runtime behavior

The methods snapshotState and notifyCheckpointComplete are always
mutually exclusive. The RMQSource relies on this but the test makes a
false assumption when it calls those two methods at the same time.

This closes #1569.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67b380d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67b380d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67b380d6

Branch: refs/heads/master
Commit: 67b380d617a942c11ab29a8d62d67b770245bb63
Parents: 83e6a2b
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Feb 1 16:30:52 2016 +0100
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Feb 1 16:51:35 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/rabbitmq/RMQSourceTest.java       | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67b380d6/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index aa19e5d..e0eed70 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -140,7 +140,9 @@ public class RMQSourceTest {
                        }
 
                        // check if the messages are being acknowledged and the 
transaction comitted
-                       source.notifyCheckpointComplete(snapshotId);
+                       synchronized (DummySourceContext.lock) {
+                               source.notifyCheckpointComplete(snapshotId);
+                       }
                        totalNumberOfAcks += numIds;
 
                }

Reply via email to