[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=308484=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308484 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 08/Sep/19 11:30 Start Date: 08/Sep/19 11:30 Worklog Time Spent: 10m Work Description: mxm commented on issue #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#issuecomment-529193874 Updated the description and squashed the commit. After we give this another test, this should be good to be merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308484) Time Spent: 3h (was: 2h 50m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 3h > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=308323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308323 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 07/Sep/19 08:28 Start Date: 07/Sep/19 08:28 Worklog Time Spent: 10m Work Description: mxm commented on issue #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#issuecomment-529086747 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308323) Time Spent: 2h 50m (was: 2h 40m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=308322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308322 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 07/Sep/19 08:27 Start Date: 07/Sep/19 08:27 Worklog Time Spent: 10m Work Description: mxm commented on issue #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#issuecomment-529086661 Thanks for testing. I've pushed an update. It was wrong to assume we could simply strip the length prefix from the ByteBuffer, as it simply holds whatever bytes the original coder produced. We need to perform an encoding round-trip to recover the original key in `OUTER` encoding. @tweise Could you please test again with the new update? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308322) Time Spent: 2h 40m (was: 2.5h) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=308077=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308077 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 06/Sep/19 18:20 Start Date: 06/Sep/19 18:20 Worklog Time Spent: 10m Work Description: tweise commented on issue #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#issuecomment-528961315 Unfortunately with this change the test pipeline fails: ``` RuntimeError: java.lang.RuntimeException: Failed to remove nested context from key: XX at org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.removeNestedContext(FlinkKeyUtils.java:85) at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:370) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308077) Time Spent: 2.5h (was: 2h 20m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307922=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307922 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 06/Sep/19 14:56 Start Date: 06/Sep/19 14:56 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#discussion_r321775089 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -343,11 +343,11 @@ public void clear(K key, W window) { } private void prepareStateBackend(K key) { - // Key for state request is shipped already encoded as ByteString, - // this is mostly a wrapping with ByteBuffer. We still follow the - // usual key encoding procedure. - // final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key, keyCoder); - final ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray()); + // Key for state request is shipped already encoded as ByteString, but it is Review comment: OK, thanks for the RCA. I will take this PR for a test drive and report back. (Just FYI no checkpointing issues with the current encoding on Flink 1.8). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307922) Time Spent: 2h 20m (was: 2h 10m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307854=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307854 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 06/Sep/19 13:34 Start Date: 06/Sep/19 13:34 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#discussion_r321737041 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -343,11 +343,11 @@ public void clear(K key, W window) { } private void prepareStateBackend(K key) { - // Key for state request is shipped already encoded as ByteString, - // this is mostly a wrapping with ByteBuffer. We still follow the - // usual key encoding procedure. - // final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key, keyCoder); - final ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray()); + // Key for state request is shipped already encoded as ByteString, but it is Review comment: Here is why this works for Flink <= 1.8 but definitely needs to be fixed moving forward: Keys for state requests are encoded incorrectly for the Flink Runner's state backend because they may contain a length prefix, which is not in lines with how the key is encoded for data partitioning. This causes state to be written to the wrong key group. However, the encoding scheme is consistent, so reading from state in `ProcessElement` or `OnTimer` methods works correctly. Also when inter-playing with timers. However, there are potential issues with checkpoints/savepoints due to this. In Flink 1.9, the key encoding mismatch is visible because reading state from a key not part of the partition _can_ return a `null` value. That is, not always, likely due to the state compaction logic. This explains why `PortableStateExecutionTest` works correctly but `PortableTimersExecutionTest` does not. That said, I think we should merge this fix ASAP for the upcoming release. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307854) Time Spent: 2h 10m (was: 2h) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307397 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 05/Sep/19 19:18 Start Date: 05/Sep/19 19:18 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#discussion_r321437183 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -343,11 +343,11 @@ public void clear(K key, W window) { } private void prepareStateBackend(K key) { - // Key for state request is shipped already encoded as ByteString, - // this is mostly a wrapping with ByteBuffer. We still follow the - // usual key encoding procedure. - // final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key, keyCoder); - final ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray()); + // Key for state request is shipped already encoded as ByteString, but it is Review comment: https://gist.github.com/tweise/e1d95369234a52b12f90794f6032f039 That's working on 2.14 and also on master as of 4460f03cefbde5cf66c797408439321de2f40692 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307397) Time Spent: 2h (was: 1h 50m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307355=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307355 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 05/Sep/19 17:25 Start Date: 05/Sep/19 17:25 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#discussion_r321387508 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -343,11 +343,11 @@ public void clear(K key, W window) { } private void prepareStateBackend(K key) { - // Key for state request is shipped already encoded as ByteString, - // this is mostly a wrapping with ByteBuffer. We still follow the - // usual key encoding procedure. - // final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key, keyCoder); - final ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray()); + // Key for state request is shipped already encoded as ByteString, but it is Review comment: There is something that I'm missing. State and timers work as of release 2.14 with Flink 1.8. What has caused it to break? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307355) Time Spent: 1h 50m (was: 1h 40m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307341=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307341 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 05/Sep/19 17:07 Start Date: 05/Sep/19 17:07 Worklog Time Spent: 10m Work Description: mxm commented on issue #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#issuecomment-528477235 Test failures unrelated to changes in this PR: https://builds.apache.org/job/beam_PreCommit_Java_Commit/7598/ ``` Test Result (2 failures / +2) org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307341) Time Spent: 1h 40m (was: 1.5h) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-8157) Flink state requests return wrong state in timers when encoded key is length-prefixed
[ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307334=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307334 ] ASF GitHub Bot logged work on BEAM-8157: Author: ASF GitHub Bot Created on: 05/Sep/19 17:03 Start Date: 05/Sep/19 17:03 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend URL: https://github.com/apache/beam/pull/9484#discussion_r321377749 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -343,11 +343,11 @@ public void clear(K key, W window) { } private void prepareStateBackend(K key) { - // Key for state request is shipped already encoded as ByteString, - // this is mostly a wrapping with ByteBuffer. We still follow the - // usual key encoding procedure. - // final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key, keyCoder); - final ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray()); + // Key for state request is shipped already encoded as ByteString, but it is Review comment: The timer test should be adjusted accordingly to catch that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307334) Time Spent: 1.5h (was: 1h 20m) > Flink state requests return wrong state in timers when encoded key is > length-prefixed > - > > Key: BEAM-8157 > URL: https://issues.apache.org/jira/browse/BEAM-8157 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.13.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.16.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > Due to multiple changes made in BEAM-7126, the Flink internal key encoding is > broken when the key is encoded with a length prefix. The Flink runner > requires the internal key to be encoded without a length prefix. -- This message was sent by Atlassian Jira (v8.3.2#803003)