[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415234#comment-16415234 ] Chesnay Schepler edited comment on FLINK-5372 at 3/27/18 8:38 AM: -- The instability seems to be caused by a plain race condition in the test: {code} // RocksDBAsyncSnapshotTest#testCancelFullyAsyncCheckpoints, L.327 // at this point the task is stuck waiting the the blocker latch task.cancel(); // let the task continue blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); // we expect the task to be shutdown, but there's no guarantee it managed to do so in time Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); {code} was (Author: zentol): The instability seems to be caused by a plain race condition in the test: {code} // L 327 // at this point the task is stuck waiting the the blocker latch task.cancel(); // let the task continue blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); // we expect the task to be shutdown, but there's no guarantee it managed to do so in time Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); {code} > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415234#comment-16415234 ] Chesnay Schepler edited comment on FLINK-5372 at 3/27/18 8:38 AM: -- The instability seems to be caused by a plain race condition in the test: {code} // L 327 // at this point the task is stuck waiting the the blocker latch task.cancel(); // let the task continue blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); // we expect the task to be shutdown, but there's no guarantee it managed to do so in time Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); {code} was (Author: zentol): The instability seems to be caused by a plain race condition in the test: {code} // L 327 // at this point the task is stuck waiting the the blocker latch task.cancel(); // let the task continue blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); // we expect the task to be shutdown, but there's no guarantee it managed to do so in time Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); {code} > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:40 PM: [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? Since AFAICT this is a test instability, I will change the fix version for this to be 1.4.2 for now. If you disagree, please let me know. was (Author: tzulitai): [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? I will change the fix version for this to be 1.4.2 for now. If you disagree, please let me know. > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.2 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:39 PM: [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? I will change the fix version for this to be 1.4.2 for now. If you disagree, please let me know. was (Author: tzulitai): [~srichter] what is the status of this issue? Can we move this to be fixed for 1.4.2? > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.5.0 >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.1 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210596#comment-16210596 ] Stefan Richter edited comment on FLINK-5372 at 10/19/17 5:41 AM: - Those are two separate problems and I don't think the problem from the comment is related to FLINK-7757. First problem is, that the test uses a checkpoint stream factory that produces streams that block on latches, then wants to call {{close()}} while the stream is supposed to be blocked in an async thread executing and writing the rocksdb snapshot. The test fails because the first stream is actually used to take a snapshot of the timer service into raw keyed state. The timer service is not async and therefore the test blocks and cannot reach {{close()}}. I will fix the test through a different factory, that gives a blocking stream on the second call, i.e. for the request that is actually from the keyed backend. For the second problem, this was a bit nasty to reproduce but simple in cause and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} method to release native resources. Since the latest RocksDB update, their code has assertions in place that are executed in {{finalize()}} and will detect leaking native resources. This happens only when the resource is GC'ed, and this explains why you could only observe the test crash when it was executed with more tests, eventually reaching a GC. was (Author: srichter): Those are two separate problems and I don't think the problem from the comment is related to FLINK-7757. First problem is, that the test uses a checkpoint stream factory that produces streams that block on latches, then wants to call {{close()}} while the stream is supposed to be blocked in an async thread executing and writing the rocksdb snapshot. While the test fails is, that the first stream is actually used to take s snapshot of the timer service into raw keyed state. The timer service is not async and therefore the test blocks and cannot reach {{close()}}. I will fix the test through a different factory, that gives a blocking stream on the second call, i.e. for the request that is actually from the keyed backend. For the second problem, this was a bit nasty to reproduce but simple in cause and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} method to release native resources. Since the latest RocksDB update, their code has assertions in place that are executed in {{finalize()}} and will detect leaking native resources. This happens only when the resource is GC'ed, and this explains why you could only observe the test crash when it was executed with more tests, eventually reaching a GC. > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA
[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210596#comment-16210596 ] Stefan Richter edited comment on FLINK-5372 at 10/19/17 5:23 AM: - Those are two separate problems and I don't think the problem from the comment is related to FLINK-7757. First problem is, that the test uses a checkpoint stream factory that produces streams that block on latches, then wants to call {{close()}} while the stream is supposed to be blocked in an async thread executing and writing the rocksdb snapshot. While the test fails is, that the first stream is actually used to take s snapshot of the timer service into raw keyed state. The timer service is not async and therefore the test blocks and cannot reach {{close()}}. I will fix the test through a different factory, that gives a blocking stream on the second call, i.e. for the request that is actually from the keyed backend. For the second problem, this was a bit nasty to reproduce but simple in cause and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} method to release native resources. Since the latest RocksDB update, their code has assertions in place that are executed in {{finalize()}} and will detect leaking native resources. This happens only when the resource is GC'ed, and this explains why you could only observe the test crash when it was executed with more tests, eventually reaching a GC. was (Author: srichter): Those are two separate problems and I don't the problem from the comment is related to FLINK-7757. First problem is, that the test uses a checkpoint stream factory that produces streams that block on latches, then wants to call {{close()}} while the stream is supposed to be blocked in an async thread executing and writing the rocksdb snapshot. While the test fails is, that the first stream is actually used to take s snapshot of the timer service into raw keyed state. The timer service is not async and therefore the test blocks and cannot reach {{close()}}. I will fix the test through a different factory, that gives a blocking stream on the second call, i.e. for the request that is actually from the keyed backend. For the second problem, this was a bit nasty to reproduce but simple in cause and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} method to release native resources. Since the latest RocksDB update, their code has assertions in place that are executed in {{finalize()}} and will detect leaking native resources. This happens only when the resource is GC'ed, and this explains why you could only observe the test crash when it was executed with more tests, eventually reaching a GC. > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek >Assignee: Stefan Richter >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator > implements OneInputStreamOperator{ > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord element) throws Exception > { > // we also don't care > ValueState state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA