[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-03-27 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415234#comment-16415234
 ] 

Chesnay Schepler edited comment on FLINK-5372 at 3/27/18 8:38 AM:
--

The instability seems to be caused by a plain race condition in the test:
{code}
//  RocksDBAsyncSnapshotTest#testCancelFullyAsyncCheckpoints, L.327
// at this point the task is stuck waiting the the blocker latch
task.cancel();
// let the task continue
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
// we expect the task to be shutdown, but there's no guarantee it managed to do 
so in time
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
{code}


was (Author: zentol):
The instability seems to be caused by a plain race condition in the test:
{code}
// L 327
// at this point the task is stuck waiting the the blocker latch
task.cancel();
// let the task continue
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
// we expect the task to be shutdown, but there's no guarantee it managed to do 
so in time
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
{code}

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-03-27 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415234#comment-16415234
 ] 

Chesnay Schepler edited comment on FLINK-5372 at 3/27/18 8:38 AM:
--

The instability seems to be caused by a plain race condition in the test:
{code}
// L 327
// at this point the task is stuck waiting the the blocker latch
task.cancel();
// let the task continue
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
// we expect the task to be shutdown, but there's no guarantee it managed to do 
so in time
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
{code}


was (Author: zentol):
The instability seems to be caused by a plain race condition in the test:
{code}
// L 327
 // at this point the task is stuck waiting the the blocker latch
task.cancel();
 // let the task continue
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
 // we expect the task to be shutdown, but there's no guarantee it managed to 
do so in time
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
{code}

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:40 PM:


[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?
Since AFAICT this is a test instability, I will change the fix version for this 
to be 1.4.2 for now. If you disagree, please let me know.


was (Author: tzulitai):
[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?
I will change the fix version for this to be 1.4.2 for now. If you disagree, 
please let me know.

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-08 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357276#comment-16357276
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5372 at 2/8/18 5:39 PM:


[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?
I will change the fix version for this to be 1.4.2 for now. If you disagree, 
please let me know.


was (Author: tzulitai):
[~srichter] what is the status of this issue? Can we move this to be fixed for 
1.4.2?

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-10-18 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210596#comment-16210596
 ] 

Stefan Richter edited comment on FLINK-5372 at 10/19/17 5:41 AM:
-

Those are two separate problems and I don't think the problem from the comment 
is related to FLINK-7757.

First problem is, that the test uses a checkpoint stream factory that produces 
streams that block on latches, then wants to call {{close()}} while the stream 
is supposed to be blocked in an async thread executing and writing the rocksdb 
snapshot. The test fails because the first stream is actually used to take a 
snapshot of the timer service into raw keyed state. The timer service is not 
async and therefore the test blocks and cannot reach {{close()}}. I will fix 
the test through a different factory, that gives a blocking stream on the 
second call, i.e. for the request that is actually from the keyed backend.

For the second problem, this was a bit nasty to reproduce but simple in cause 
and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an 
instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} 
method to release native resources. Since the latest RocksDB update, their code 
has assertions in place that are executed in {{finalize()}} and will detect 
leaking native resources. This happens only when the resource is GC'ed, and 
this explains why you could only observe the test crash when it was executed 
with more tests, eventually reaching a GC.


was (Author: srichter):
Those are two separate problems and I don't think the problem from the comment 
is related to FLINK-7757.

First problem is, that the test uses a checkpoint stream factory that produces 
streams that block on latches, then wants to call {{close()}} while the stream 
is supposed to be blocked in an async thread executing and writing the rocksdb 
snapshot. While the test fails is, that the first stream is actually used to 
take s snapshot of the timer service into raw keyed state. The timer service is 
not async and therefore the test blocks and cannot reach {{close()}}. I will 
fix the test through a different factory, that gives a blocking stream on the 
second call, i.e. for the request that is actually from the keyed backend.

For the second problem, this was a bit nasty to reproduce but simple in cause 
and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an 
instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} 
method to release native resources. Since the latest RocksDB update, their code 
has assertions in place that are executed in {{finalize()}} and will detect 
leaking native resources. This happens only when the resource is GC'ed, and 
this explains why you could only observe the test crash when it was executed 
with more tests, eventually reaching a GC.

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA

[jira] [Comment Edited] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-10-18 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210596#comment-16210596
 ] 

Stefan Richter edited comment on FLINK-5372 at 10/19/17 5:23 AM:
-

Those are two separate problems and I don't think the problem from the comment 
is related to FLINK-7757.

First problem is, that the test uses a checkpoint stream factory that produces 
streams that block on latches, then wants to call {{close()}} while the stream 
is supposed to be blocked in an async thread executing and writing the rocksdb 
snapshot. While the test fails is, that the first stream is actually used to 
take s snapshot of the timer service into raw keyed state. The timer service is 
not async and therefore the test blocks and cannot reach {{close()}}. I will 
fix the test through a different factory, that gives a blocking stream on the 
second call, i.e. for the request that is actually from the keyed backend.

For the second problem, this was a bit nasty to reproduce but simple in cause 
and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an 
instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} 
method to release native resources. Since the latest RocksDB update, their code 
has assertions in place that are executed in {{finalize()}} and will detect 
leaking native resources. This happens only when the resource is GC'ed, and 
this explains why you could only observe the test crash when it was executed 
with more tests, eventually reaching a GC.


was (Author: srichter):
Those are two separate problems and I don't the problem from the comment is 
related to FLINK-7757.

First problem is, that the test uses a checkpoint stream factory that produces 
streams that block on latches, then wants to call {{close()}} while the stream 
is supposed to be blocked in an async thread executing and writing the rocksdb 
snapshot. While the test fails is, that the first stream is actually used to 
take s snapshot of the timer service into raw keyed state. The timer service is 
not async and therefore the test blocks and cannot reach {{close()}}. I will 
fix the test through a different factory, that gives a blocking stream on the 
second call, i.e. for the request that is actually from the keyed backend.

For the second problem, this was a bit nasty to reproduce but simple in cause 
and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an 
instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}} 
method to release native resources. Since the latest RocksDB update, their code 
has assertions in place that are executed in {{finalize()}} and will detect 
leaking native resources. This happens only when the resource is GC'ed, and 
this explains why you could only observe the test crash when it was executed 
with more tests, eventually reaching a GC.

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA