[FLINK-3265][tests] adapt RMQSource checkpointing test to runtime behavior The methods snapshotState and notifyCheckpointComplete are always mutually exclusive. The RMQSource relies on this but the test makes a false assumption when it calls those two methods at the same time.
This closes #1569. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67b380d6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67b380d6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67b380d6 Branch: refs/heads/master Commit: 67b380d617a942c11ab29a8d62d67b770245bb63 Parents: 83e6a2b Author: Maximilian Michels <m...@apache.org> Authored: Mon Feb 1 16:30:52 2016 +0100 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Feb 1 16:51:35 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/connectors/rabbitmq/RMQSourceTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/67b380d6/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index aa19e5d..e0eed70 100644 --- a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -140,7 +140,9 @@ public class RMQSourceTest { } // check if the messages are being acknowledged and the transaction comitted - source.notifyCheckpointComplete(snapshotId); + synchronized (DummySourceContext.lock) { + source.notifyCheckpointComplete(snapshotId); + } totalNumberOfAcks += numIds; }