[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=321193=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321193 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 01/Oct/19 11:33 Start Date: 01/Oct/19 11:33 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321193) Time Spent: 29h (was: 28h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 29h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=321143=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321143 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 01/Oct/19 09:28 Start Date: 01/Oct/19 09:28 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536952055 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321143) Time Spent: 28h 40m (was: 28.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 28h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=321142=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321142 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 01/Oct/19 09:27 Start Date: 01/Oct/19 09:27 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536952022 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 321142) Time Spent: 28.5h (was: 28h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 28.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320835=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320835 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 20:30 Start Date: 30/Sep/19 20:30 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536738137 Python tests broken on master, see https://builds.apache.org/job/beam_PreCommit_Python_Cron/ and https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320835) Time Spent: 28h 20m (was: 28h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 28h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320655=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320655 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 16:47 Start Date: 30/Sep/19 16:47 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536616196 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320655) Time Spent: 28h 10m (was: 28h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 28h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320654=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320654 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 16:47 Start Date: 30/Sep/19 16:47 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536648741 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320654) Time Spent: 28h (was: 27h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 28h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320652=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320652 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 16:47 Start Date: 30/Sep/19 16:47 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536648503 Py37 somehow broken (also showing up in other builds): ``` 18:34:00 FAILURE: Build completed with 2 failures. 18:34:00 18:34:00 1: Task failed with an exception. 18:34:00 --- 18:34:00 * What went wrong: 18:34:00 Execution failed for task ':sdks:python:test-suites:tox:py37:testPy37Gcp'. 18:34:00 > Process 'command 'sh'' finished with non-zero exit value 1 18:34:00 18:34:00 * Try: 18:34:00 Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights. 18:34:00 == 18:34:00 18:34:00 2: Task failed with an exception. 18:34:00 --- 18:34:00 * Where: 18:34:00 Build file '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/dataflow/py37/build.gradle' line: 66 18:34:00 18:34:00 * What went wrong: 18:34:00 Execution failed for task ':sdks:python:test-suites:dataflow:py37:preCommitIT'. 18:34:00 > Process 'command 'sh'' finished with non-zero exit value 1 18:34:00 ``` https://builds.apache.org/job/beam_PreCommit_Python_Commit/8761/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320652) Time Spent: 27h 40m (was: 27.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 27h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320653=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320653 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 16:47 Start Date: 30/Sep/19 16:47 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536648699 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320653) Time Spent: 27h 50m (was: 27h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 27h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320650=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320650 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 16:46 Start Date: 30/Sep/19 16:46 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536648503 Py37 somehow broken (also showing up in other builds): ``` 18:34:00 FAILURE: Build completed with 2 failures. 18:34:00 18:34:00 1: Task failed with an exception. 18:34:00 --- 18:34:00 * What went wrong: 18:34:00 Execution failed for task ':sdks:python:test-suites:tox:py37:testPy37Gcp'. 18:34:00 > Process 'command 'sh'' finished with non-zero exit value 1 18:34:00 18:34:00 * Try: 18:34:00 Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights. 18:34:00 == 18:34:00 18:34:00 2: Task failed with an exception. 18:34:00 --- 18:34:00 * Where: 18:34:00 Build file '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/sdks/python/test-suites/dataflow/py37/build.gradle' line: 66 18:34:00 18:34:00 * What went wrong: 18:34:00 Execution failed for task ':sdks:python:test-suites:dataflow:py37:preCommitIT'. 18:34:00 > Process 'command 'sh'' finished with non-zero exit value 1 18:34:00 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320650) Time Spent: 27.5h (was: 27h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 27.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320574=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320574 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 15:29 Start Date: 30/Sep/19 15:29 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536616196 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320574) Time Spent: 27h 20m (was: 27h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 27h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320563=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320563 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 15:14 Start Date: 30/Sep/19 15:14 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-536608996 Latest changes here: https://github.com/apache/beam/compare/40cb35361234d12bd4ff8f8126aa5128bc07fcba..55588e91ed8e3e25bb661a6202c31e99297e0e79 From my perspective, all comments are addressed now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320563) Time Spent: 27h 10m (was: 27h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 27h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320561 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 15:12 Start Date: 30/Sep/19 15:12 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329633832 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: Added a comments to further clarify. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320561) Time Spent: 26h 50m (was: 26h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 26h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320562=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320562 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 15:12 Start Date: 30/Sep/19 15:12 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329633832 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: Added comments to further clarify. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320562) Time Spent: 27h (was: 26h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 27h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320560 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 15:11 Start Date: 30/Sep/19 15:11 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329633492 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -370,17 +365,20 @@ def clear(self): self._added_elements = [] def _commit(self): +to_await = None if self._cleared: - self._state_handler.blocking_clear(self._state_key) + to_await = self._state_handler.clear(self._state_key, is_cached=True) if self._added_elements: - value_coder_impl = self._value_coder.get_impl() - out = coder_impl.create_OutputStream() - for element in self._added_elements: -value_coder_impl.encode_to_stream(element, out, True) - self._state_handler.blocking_append(self._state_key, out.get()) + to_await = self._state_handler.append( Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320560) Time Spent: 26h 40m (was: 26.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 26h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320518 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 14:21 Start Date: 30/Sep/19 14:21 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329604327 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -370,17 +365,20 @@ def clear(self): self._added_elements = [] def _commit(self): +to_await = None if self._cleared: - self._state_handler.blocking_clear(self._state_key) + to_await = self._state_handler.clear(self._state_key, is_cached=True) if self._added_elements: - value_coder_impl = self._value_coder.get_impl() - out = coder_impl.create_OutputStream() - for element in self._added_elements: -value_coder_impl.encode_to_stream(element, out, True) - self._state_handler.blocking_append(self._state_key, out.get()) + to_await = self._state_handler.append( Review comment: Good suggestion, although I think we have been adding multiple elements before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320518) Time Spent: 26.5h (was: 26h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 26.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320517=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320517 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 30/Sep/19 14:21 Start Date: 30/Sep/19 14:21 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329604291 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +632,93 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present: +# https://issues.apache.org/jira/browse/BEAM-8298 +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = list( Review comment: Without a size optimization, I don't think there is much we can do at the moment. For large state, the continuation token could be used, but it is not utilized by the Runner for now. Considering that state caching is disabled for now and the previous state materialization strategy (lazily fetch one item at a time) has not been altered, it seems fair to address this in a follow-up and mark this a TODO for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320517) Time Spent: 26h 20m (was: 26h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 26h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320028=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320028 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 28/Sep/19 16:22 Start Date: 28/Sep/19 16:22 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329316747 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: >The code that is removed keeps fetching the state until there is no token. This code has _not_ been removed. It has simply been refactored to support fetching all state at once instead of just one element at a time. The logic regarding the continuation token is unchanged. Please look inside the `materialize_iter` method. Of course I will address all remaining comments before merging the PR, as soon as I get a chance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320028) Time Spent: 26h 10m (was: 26h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 26h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=320027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-320027 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 28/Sep/19 16:03 Start Date: 28/Sep/19 16:03 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329315665 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: The code that is removed keeps fetching the state until there is no token. My question was what the replacement for this is? If you are saying that there was no need for it in first place because there is no support on the runner side, then that's OK. It might be appropriate to capture this and the question raised by Robert in a comment and/or link to the appropriate JIRA. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 320027) Time Spent: 26h (was: 25h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 26h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319982 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 28/Sep/19 09:48 Start Date: 28/Sep/19 09:48 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329304677 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: There was no continuation token logic removed. Robert raised the point that the state is materialized no matter how many continuation tokens are available (only if Caching is enabled). This can result in too large state being loaded. However, neither is the continuation token used by the Runner currently, nor is this likely to happen for user state, rather for side inputs which are currently not cached. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319982) Time Spent: 25h 50m (was: 25h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319746=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319746 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 18:44 Start Date: 27/Sep/19 18:44 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329194247 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -370,17 +365,20 @@ def clear(self): self._added_elements = [] def _commit(self): +to_await = None if self._cleared: - self._state_handler.blocking_clear(self._state_key) + to_await = self._state_handler.clear(self._state_key, is_cached=True) if self._added_elements: - value_coder_impl = self._value_coder.get_impl() - out = coder_impl.create_OutputStream() - for element in self._added_elements: -value_coder_impl.encode_to_stream(element, out, True) - self._state_handler.blocking_append(self._state_key, out.get()) + to_await = self._state_handler.append( Review comment: Just a thought: I wonder if append should be renamed extend (analogous to list.append vs list.extend) as we're concatenating multiple elements here now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319746) Time Spent: 25h 40m (was: 25.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319745=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319745 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 18:44 Start Date: 27/Sep/19 18:44 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329199520 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +632,93 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present: +# https://issues.apache.org/jira/browse/BEAM-8298 +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = list( Review comment: This will break if the value is too large to fit into memory. Of particular concern is the case of a GBK result that doesn't fit into memory and is fetched via the state API, but very large bag states are not that uncommon either. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319745) Time Spent: 25h 40m (was: 25.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319705=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319705 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 17:49 Start Date: 27/Sep/19 17:49 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329183693 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -82,6 +84,11 @@ beam.coders.coders.GlobalWindowCoder()).get_impl().encode_nested( beam.transforms.window.GlobalWindows.windowed_value(b'')) +# State caching is enabled in fn fn_api_runner for testing, except for one Review comment: ```suggestion # State caching is enabled in the fn_api_runner for testing, except for one ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319705) Time Spent: 25h 20m (was: 25h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319706 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 17:49 Start Date: 27/Sep/19 17:49 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329183882 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -82,6 +84,11 @@ beam.coders.coders.GlobalWindowCoder()).get_impl().encode_nested( beam.transforms.window.GlobalWindows.windowed_value(b'')) +# State caching is enabled in fn fn_api_runner for testing, except for one +# test which runs without state caching (FnApiRunnerTestWithDisabledCaching). +# The Cache is disabled in production for other Runners. Review comment: ```suggestion # The cache is disabled in production for other runners. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319706) Time Spent: 25.5h (was: 25h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319663=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319663 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 16:47 Start Date: 27/Sep/19 16:47 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329161205 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: For the continuation token logic that was removed, what is the replacement? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319663) Time Spent: 25h 10m (was: 25h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319635=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319635 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 16:02 Start Date: 27/Sep/19 16:02 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-53531 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319635) Time Spent: 24h 50m (was: 24h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 24h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319636=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319636 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 16:02 Start Date: 27/Sep/19 16:02 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-53531 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319636) Time Spent: 25h (was: 24h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 25h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319599=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319599 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 15:03 Start Date: 27/Sep/19 15:03 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-535977920 Rebased to master. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319599) Time Spent: 24h 40m (was: 24.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 24h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319572=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319572 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 14:08 Start Date: 27/Sep/19 14:08 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329089645 ## File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py ## @@ -201,7 +204,8 @@ class PortableRunnerTestWithExternalEnv(PortableRunnerTest): @classmethod def setUpClass(cls): cls._worker_address, cls._worker_server = ( -worker_pool_main.BeamFnExternalWorkerPoolServicer.start()) +worker_pool_main.BeamFnExternalWorkerPoolServicer.start( Review comment: Since this affects multiple parameters of the `start` method (e.g. `worker_threads`), I would handle this in a follow-up. It looks like either way, via individual parameters or via pipeline options here, there will be duplicate code paths, due to this method being used in a test context and for the main entry point in `if __name__ == '__main__'`. To fix this it would require restructuring of the entire file, which I see out of scope for this PR. But good to think about doing that eventually. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319572) Time Spent: 24.5h (was: 24h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 24.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319512=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319512 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 12:16 Start Date: 27/Sep/19 12:16 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329043012 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +import logging +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + + :arg max_entries The maximum number of entries to store in the cache. + TODO Memory-based caching: https://issues.apache.org/jira/browse/BEAM-8297 + """ + + def __init__(self, max_entries): +logging.info('Creating state cache with size %s', max_entries) +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token in [cache_token, None]: +if value is None: + value = [] +value.extend(elements) +self._cache.put(state_key, (cache_token, value)) + else: +# Discard cached state if tokens do not match +self._cache.evict(state_key) + + def clear(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, _ = self._cache.get(state_key) + if token in [cache_token, None]: +self._cache.put(state_key, (cache_token, [])) + else: +# Discard cached state if tokens do not match +self._cache.evict(state_key) + + def evict(self, state_key): +assert self.is_cache_enabled() +with self._lock: + self._cache.evict(state_key) + + def evict_all(self): +with self._lock: + self._cache.evict_all() + + def is_cache_enabled(self): +return self._cache._max_entries > 0 + + def __len__(self): +return len(self._cache) + + class LRUCache(object): + +def __init__(self, max_entries, default_entry): + self._max_entries = max_entries + self._default_entry = default_entry + self._cache = collections.OrderedDict() + +def get(self, key): + value = self._cache.pop(key, self._default_entry) + if value != self._default_entry: +self._cache[key] = value + return value + +def put(self, key, value): + self._cache[key] = value + while len(self._cache) > self._max_entries: +self._cache.popitem(last=False) + +def evict(self, key): + self._cache.pop(key, self._default_entry) + +def evict_all(self): Review comment: Maybe, but only if we also change `evict(key)`to `clear` as well. IMHO evict suits for a cache.
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319507 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 12:09 Start Date: 27/Sep/19 12:09 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r329040611 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1412,11 +1470,13 @@ def stop_worker(self): class WorkerHandlerManager(object): - def __init__(self, environments, job_provision_info): + def __init__(self, environments, job_provision_info, state_cache_size): self._environments = environments self._job_provision_info = job_provision_info self._cached_handlers = collections.defaultdict(list) -self._state = FnApiRunner.StateServicer() # rename? +self._state = sdk_worker.CachingMaterializingStateHandler( +StateCache(state_cache_size), Review comment: Fixed now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319507) Time Spent: 24h 10m (was: 24h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 24h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319506=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319506 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 27/Sep/19 12:08 Start Date: 27/Sep/19 12:08 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328751006 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +632,107 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present: +# https://issues.apache.org/jira/browse/BEAM-8298 +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = list( + self._materialize_iter(state_key, coder)) + self._state_cache.put( + cache_state_key, + self._context.cache_token, + materialized) +return iter(cached_value) + + def append(self, state_key, coder, elements, is_cached=False): +if self._should_be_cached(is_cached): + # Update the cache + cache_key = self._convert_to_cache_key(state_key) + self._state_cache.append(cache_key, self._context.cache_token, elements) +# Write to state handler +out = coder_impl.create_OutputStream() +for element in elements: + coder.encode_to_stream(element, out, True) +return self._underlying.append_raw(state_key, out.get()) + + def clear(self, state_key, is_cached=False): +if self._should_be_cached(is_cached): + cache_key = self._convert_to_cache_key(state_key) + self._state_cache.clear(cache_key, self._context.cache_token) +return self._underlying.clear(state_key) + + # The following methods are for interaction with the FnApiRunner: + + def get_raw(self, state_key, continuation_token=None): +return self._underlying.get_raw(state_key, continuation_token) + + def append_raw(self, state_key, data): +return self._underlying.append_raw(state_key, data) + + def restore(self): +self._underlying.restore() + + def checkpoint(self): Review comment: Note, these and the above until the comment will go away (fix will come tomorrow), as they won't be necessary with the cache not being inserted for the state handler of the fn_api_runner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319506) Time Spent: 24h (was: 23h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 24h > Remaining Estimate: 0h > > Tech spec: >
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319110 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 18:02 Start Date: 26/Sep/19 18:02 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328751006 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +632,107 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present: +# https://issues.apache.org/jira/browse/BEAM-8298 +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = list( + self._materialize_iter(state_key, coder)) + self._state_cache.put( + cache_state_key, + self._context.cache_token, + materialized) +return iter(cached_value) + + def append(self, state_key, coder, elements, is_cached=False): +if self._should_be_cached(is_cached): + # Update the cache + cache_key = self._convert_to_cache_key(state_key) + self._state_cache.append(cache_key, self._context.cache_token, elements) +# Write to state handler +out = coder_impl.create_OutputStream() +for element in elements: + coder.encode_to_stream(element, out, True) +return self._underlying.append_raw(state_key, out.get()) + + def clear(self, state_key, is_cached=False): +if self._should_be_cached(is_cached): + cache_key = self._convert_to_cache_key(state_key) + self._state_cache.clear(cache_key, self._context.cache_token) +return self._underlying.clear(state_key) + + # The following methods are for interaction with the FnApiRunner: + + def get_raw(self, state_key, continuation_token=None): +return self._underlying.get_raw(state_key, continuation_token) + + def append_raw(self, state_key, data): +return self._underlying.append_raw(state_key, data) + + def restore(self): +self._underlying.restore() + + def checkpoint(self): Review comment: Note, these and the above until the comment will go away (fix will come tomorrow), as they won't be necessary with the cache not being inserted for the state handler of the fn_api_runner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319110) Time Spent: 23h 50m (was: 23h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 23h 50m > Remaining Estimate: 0h > > Tech spec: >
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319093=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319093 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 17:15 Start Date: 26/Sep/19 17:15 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328730529 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1412,11 +1470,13 @@ def stop_worker(self): class WorkerHandlerManager(object): - def __init__(self, environments, job_provision_info): + def __init__(self, environments, job_provision_info, state_cache_size): self._environments = environments self._job_provision_info = job_provision_info self._cached_handlers = collections.defaultdict(list) -self._state = FnApiRunner.StateServicer() # rename? +self._state = sdk_worker.CachingMaterializingStateHandler( +StateCache(state_cache_size), Review comment: I added this because the WorkerHandlerManager will insert the state handler into the WorkerHandlerFactory, which generates a cached BundleProcessorCache with that state handler for the EmbeddedWorkerHandler. It is not necessary otherwise and just does unnecessary caching on the FnApiRunner side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319093) Time Spent: 23h 40m (was: 23.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 23h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319037=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319037 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:20 Start Date: 26/Sep/19 15:20 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328676138 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,122 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +import logging +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + + :arg max_entries The maximum number of entries to store in the cache. + TODO Memory-based caching: https://issues.apache.org/jira/browse/BEAM-8297 + """ + + def __init__(self, max_entries): +logging.info('Creating state cache with size %s', max_entries) +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token in [cache_token, None]: +if value is None: + value = [] +value.extend(elements) +self._cache.put(state_key, (cache_token, value)) + else: +# Discard cached state if tokens do not match +self._cache.evict(state_key) + + def clear(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, _ = self._cache.get(state_key) + if token in [cache_token, None]: +self._cache.put(state_key, (cache_token, [])) + else: +# Discard cached state if tokens do not match +self._cache.evict(state_key) + + def evict(self, state_key): +assert self.is_cache_enabled() +with self._lock: + self._cache.evict(state_key) + + def evict_all(self): +with self._lock: + self._cache.evict_all() + + def is_cache_enabled(self): +return self._cache._max_entries > 0 + + def __len__(self): +return len(self._cache) + + class LRUCache(object): + +def __init__(self, max_entries, default_entry): + self._max_entries = max_entries + self._default_entry = default_entry + self._cache = collections.OrderedDict() + +def get(self, key): + value = self._cache.pop(key, self._default_entry) + if value != self._default_entry: +self._cache[key] = value + return value + +def put(self, key, value): + self._cache[key] = value + while len(self._cache) > self._max_entries: +self._cache.popitem(last=False) + +def evict(self, key): + self._cache.pop(key, self._default_entry) + +def evict_all(self): Review comment: nit: would clear be the better name for this? This is an automated message from the Apache
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319036=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319036 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:16 Start Date: 26/Sep/19 15:16 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328673772 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -430,13 +424,13 @@ def clear(self): def _commit(self): if self._cleared: - self._state_handler.blocking_clear(self._state_key) + self._state_handler.clear(self._state_key, is_cached=True).get() if self._added_elements: - value_coder_impl = self._value_coder.get_impl() - out = coder_impl.create_OutputStream() - for element in self._added_elements: -value_coder_impl.encode_to_stream(element, out, True) - self._state_handler.blocking_append(self._state_key, out.get()) + self._state_handler.append( Review comment: Will consolidate with the above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319036) Time Spent: 23h 20m (was: 23h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 23h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319033=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319033 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:14 Start Date: 26/Sep/19 15:14 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328673182 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -205,6 +206,28 @@ def _get_worker_count(pipeline_options): return 12 +def _get_state_cache_size(pipeline_options): + """Defines the upper number of state items to cache. + + Note: state_cache_size is an experimental flag and might not be available in + future releases. + + Returns: +an int indicating the maximum number of items to cache. + Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'state_cache_size=', experiment): + return int( + re.match(r'state_cache_size=(?P.*)', + experiment).group('state_cache_size')) + return 100 Review comment: Good catch, this was actually pending in my branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319033) Time Spent: 23h 10m (was: 23h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 23h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319031=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319031 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:14 Start Date: 26/Sep/19 15:14 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328672702 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -430,13 +424,13 @@ def clear(self): def _commit(self): if self._cleared: - self._state_handler.blocking_clear(self._state_key) + self._state_handler.clear(self._state_key, is_cached=True).get() Review comment: Yes, and no. We need to wait for the last response here. For example, if there is no following append, we'll have to wait on the clear. So best to safe the last returned future in a variable and call `.get()`on it here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319031) Time Spent: 23h (was: 22h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 23h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319030=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319030 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:13 Start Date: 26/Sep/19 15:13 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328672482 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -205,6 +206,28 @@ def _get_worker_count(pipeline_options): return 12 +def _get_state_cache_size(pipeline_options): + """Defines the upper number of state items to cache. + + Note: state_cache_size is an experimental flag and might not be available in + future releases. + + Returns: +an int indicating the maximum number of items to cache. + Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'state_cache_size=', experiment): + return int( + re.match(r'state_cache_size=(?P.*)', + experiment).group('state_cache_size')) + return 100 Review comment: Should be 0 (the default)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319030) Time Spent: 22h 50m (was: 22h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 22h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319029=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319029 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:12 Start Date: 26/Sep/19 15:12 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328671827 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: Could you elaborate? The continuation token is respected as before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319029) Time Spent: 22h 40m (was: 22.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 22h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319028=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319028 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:12 Start Date: 26/Sep/19 15:12 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328671811 ## File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py ## @@ -201,7 +204,8 @@ class PortableRunnerTestWithExternalEnv(PortableRunnerTest): @classmethod def setUpClass(cls): cls._worker_address, cls._worker_server = ( -worker_pool_main.BeamFnExternalWorkerPoolServicer.start()) +worker_pool_main.BeamFnExternalWorkerPoolServicer.start( Review comment: Will look into this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319028) Time Spent: 22.5h (was: 22h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 22.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319027 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:12 Start Date: 26/Sep/19 15:12 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328671668 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1412,11 +1470,13 @@ def stop_worker(self): class WorkerHandlerManager(object): - def __init__(self, environments, job_provision_info): + def __init__(self, environments, job_provision_info, state_cache_size): self._environments = environments self._job_provision_info = job_provision_info self._cached_handlers = collections.defaultdict(list) -self._state = FnApiRunner.StateServicer() # rename? +self._state = sdk_worker.CachingMaterializingStateHandler( +StateCache(state_cache_size), Review comment: Good catch. This does not seem to be necessary. Let me revisit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319027) Time Spent: 22h 20m (was: 22h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 22h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319009 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:01 Start Date: 26/Sep/19 15:01 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328665599 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -430,13 +424,13 @@ def clear(self): def _commit(self): if self._cleared: - self._state_handler.blocking_clear(self._state_key) + self._state_handler.clear(self._state_key, is_cached=True).get() if self._added_elements: - value_coder_impl = self._value_coder.get_impl() - out = coder_impl.create_OutputStream() - for element in self._added_elements: -value_coder_impl.encode_to_stream(element, out, True) - self._state_handler.blocking_append(self._state_key, out.get()) + self._state_handler.append( Review comment: An explicit comment regarding the need for blocking call might be useful here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319009) Time Spent: 22h 10m (was: 22h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 22h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319008=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319008 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 15:00 Start Date: 26/Sep/19 15:00 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328665061 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -430,13 +424,13 @@ def clear(self): def _commit(self): if self._cleared: - self._state_handler.blocking_clear(self._state_key) + self._state_handler.clear(self._state_key, is_cached=True).get() Review comment: Is this the call where we don't need to wait for a response? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319008) Time Spent: 22h (was: 21h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 22h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=319002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-319002 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 14:50 Start Date: 26/Sep/19 14:50 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328659068 ## File path: sdks/python/apache_beam/runners/worker/bundle_processor.py ## @@ -199,26 +199,19 @@ def finish(self): class _StateBackedIterable(object): - def __init__(self, state_handler, state_key, coder_or_impl): + def __init__(self, state_handler, state_key, coder_or_impl, + is_cached=False): self._state_handler = state_handler self._state_key = state_key if isinstance(coder_or_impl, coders.Coder): self._coder_impl = coder_or_impl.get_impl() else: self._coder_impl = coder_or_impl +self._is_cached = is_cached def __iter__(self): -# This is the continuation token this might be useful -data, continuation_token = self._state_handler.blocking_get(self._state_key) -while True: - input_stream = coder_impl.create_InputStream(data) - while input_stream.size() > 0: -yield self._coder_impl.decode_from_stream(input_stream, True) - if not continuation_token: -break - else: -data, continuation_token = self._state_handler.blocking_get( -self._state_key, continuation_token) +return self._state_handler.blocking_get( Review comment: Is there a JIRA for the continuation token that can be referenced here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 319002) Time Spent: 21h 50m (was: 21h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 21h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318997 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 14:48 Start Date: 26/Sep/19 14:48 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328658157 ## File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py ## @@ -201,7 +204,8 @@ class PortableRunnerTestWithExternalEnv(PortableRunnerTest): @classmethod def setUpClass(cls): cls._worker_address, cls._worker_server = ( -worker_pool_main.BeamFnExternalWorkerPoolServicer.start()) +worker_pool_main.BeamFnExternalWorkerPoolServicer.start( Review comment: Logically this is duplicates the experiment flag above. I can see why that is currently necessary, but maybe it would be better to provide the pipeline options to the worker pool servicer? (Inside the container this would happen via provisioning endpoint and environment.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318997) Time Spent: 21h 40m (was: 21.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 21h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318994=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318994 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 26/Sep/19 14:37 Start Date: 26/Sep/19 14:37 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r328651322 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1412,11 +1470,13 @@ def stop_worker(self): class WorkerHandlerManager(object): - def __init__(self, environments, job_provision_info): + def __init__(self, environments, job_provision_info, state_cache_size): self._environments = environments self._job_provision_info = job_provision_info self._cached_handlers = collections.defaultdict(list) -self._state = FnApiRunner.StateServicer() # rename? +self._state = sdk_worker.CachingMaterializingStateHandler( +StateCache(state_cache_size), Review comment: For my learning, why is the cache required inside the fn_api_runner vs. on the SDK side? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318994) Time Spent: 21.5h (was: 21h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 21.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318497=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318497 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 18:09 Start Date: 25/Sep/19 18:09 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328266675 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -414,6 +432,21 @@ public static StateRequestHandler forBagUserStateHandlerFactory( } } +@Override +public Iterable getCacheTokens() { + return handlerCache.values().stream() Review comment: It depends what you are used to. I find the fluent pattern easier to read, but I agree that the procedural definition is probably easier for most people. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318497) Time Spent: 21h 20m (was: 21h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 21h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318427=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318427 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 16:08 Start Date: 25/Sep/19 16:08 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328211831 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -414,6 +432,21 @@ public static StateRequestHandler forBagUserStateHandlerFactory( } } +@Override +public Iterable getCacheTokens() { + return handlerCache.values().stream() Review comment: It grinds my gears when people use streams for things because they want a fluent builder pattern for processing when a for loop is easier to read/understand for the majority of people. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318427) Time Spent: 21h 10m (was: 21h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 21h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318353 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 14:13 Start Date: 25/Sep/19 14:13 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318353) Time Spent: 21h (was: 20h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 21h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318241=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318241 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 11:08 Start Date: 25/Sep/19 11:08 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328061202 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java ## @@ -314,13 +332,15 @@ public void append(K key, W window, Iterator values) { while (values.hasNext()) { bagState.add(values.next()); } + +return generateAndRegisterCacheKey(); Review comment: Note that this is possible now because the cache token is passed via an `Optional`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318241) Time Spent: 20h 50m (was: 20h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318236=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318236 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 11:07 Start Date: 25/Sep/19 11:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328060568 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -137,6 +143,12 @@ static SideInputHandlerFactory unsupported() { /** Clears the bag user state for the given key and window. */ void clear(K key, W window); + +/** Returns the currently valid cache token. */ Review comment: Agree, for now `Optional` looks like the best choice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318236) Time Spent: 20h 20m (was: 20h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318238=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318238 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 11:07 Start Date: 25/Sep/19 11:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328060603 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java ## @@ -574,6 +579,41 @@ public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws } } + @Test + public void verifyCacheTokensAreUsedInNewBundleRequest() { +CompletableFuture registerResponseFuture = new CompletableFuture<>(); +when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) +.thenReturn(registerResponseFuture); + +ProcessBundleDescriptor descriptor1 = +ProcessBundleDescriptor.newBuilder().setId("descriptor1").build(); + +Map>> remoteInputs = +Collections.singletonMap( +"inputPC", +RemoteInputDestination.of( +(FullWindowedValueCoder) +FullWindowedValueCoder.of(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE), +SDK_GRPC_READ_TRANSFORM)); + +BundleProcessor processor1 = sdkHarnessClient.getProcessor(descriptor1, remoteInputs); +when(dataService.send(any(), any())).thenReturn(mock(CloseableFnDataReceiver.class)); + +StateRequestHandler stateRequestHandler = Mockito.mock(StateRequestHandler.class); +List cacheTokens = +Collections.singletonList( + BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().getDefaultInstanceForType()); +when(stateRequestHandler.getCacheTokens()).thenReturn(cacheTokens); + +processor1.newBundle( +ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mock(RemoteOutputReceiver.class)), +stateRequestHandler, +BundleProgressHandler.ignored()); + +// Verify that the cache token list is retrieved from the state handler +Mockito.verify(stateRequestHandler).getCacheTokens(); Review comment: Fair point. Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318238) Time Spent: 20h 40m (was: 20.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318237=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318237 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 11:07 Start Date: 25/Sep/19 11:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328060592 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -414,6 +432,21 @@ public static StateRequestHandler forBagUserStateHandlerFactory( } } +@Override +public Iterable getCacheTokens() { + return handlerCache.values().stream() Review comment: Not sure how much this contributes here to the overall computing, but the performance of the blog post indeed looks pretty bad. I've replaced this with for loops. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318237) Time Spent: 20.5h (was: 20h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=318235=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-318235 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 25/Sep/19 11:06 Start Date: 25/Sep/19 11:06 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r328060554 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -278,6 +291,11 @@ public static StateRequestHandler forSideInputHandlerFactory( } } +@Override +public Iterable getCacheTokens() { Review comment: That's correct. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 318235) Time Spent: 20h 10m (was: 20h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317745=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317745 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 18:39 Start Date: 24/Sep/19 18:39 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r327767419 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -137,6 +143,12 @@ static SideInputHandlerFactory unsupported() { /** Clears the bag user state for the given key and window. */ void clear(K key, W window); + +/** Returns the currently valid cache token. */ Review comment: ```suggestion /** Returns the currently valid cache token. There is no valid cache token if null is returned. */ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317745) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317747=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317747 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 18:39 Start Date: 24/Sep/19 18:39 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r327767697 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -137,6 +143,12 @@ static SideInputHandlerFactory unsupported() { /** Clears the bag user state for the given key and window. */ void clear(K key, W window); + +/** Returns the currently valid cache token. */ Review comment: nit: We might want to go with an Iterable here or Optional instead of using null to mark missing cache tokens. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317747) Time Spent: 20h (was: 19h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 20h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317743=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317743 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 18:39 Start Date: 24/Sep/19 18:39 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r327764965 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -278,6 +291,11 @@ public static StateRequestHandler forSideInputHandlerFactory( } } +@Override +public Iterable getCacheTokens() { Review comment: I don't think we need this since this is the default implementation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317743) Time Spent: 19h 40m (was: 19.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317746=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317746 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 18:39 Start Date: 24/Sep/19 18:39 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r327773025 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java ## @@ -574,6 +579,41 @@ public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws } } + @Test + public void verifyCacheTokensAreUsedInNewBundleRequest() { +CompletableFuture registerResponseFuture = new CompletableFuture<>(); +when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) +.thenReturn(registerResponseFuture); + +ProcessBundleDescriptor descriptor1 = +ProcessBundleDescriptor.newBuilder().setId("descriptor1").build(); + +Map>> remoteInputs = +Collections.singletonMap( +"inputPC", +RemoteInputDestination.of( +(FullWindowedValueCoder) +FullWindowedValueCoder.of(VarIntCoder.of(), GlobalWindow.Coder.INSTANCE), +SDK_GRPC_READ_TRANSFORM)); + +BundleProcessor processor1 = sdkHarnessClient.getProcessor(descriptor1, remoteInputs); +when(dataService.send(any(), any())).thenReturn(mock(CloseableFnDataReceiver.class)); + +StateRequestHandler stateRequestHandler = Mockito.mock(StateRequestHandler.class); +List cacheTokens = +Collections.singletonList( + BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().getDefaultInstanceForType()); +when(stateRequestHandler.getCacheTokens()).thenReturn(cacheTokens); + +processor1.newBundle( +ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mock(RemoteOutputReceiver.class)), +stateRequestHandler, +BundleProgressHandler.ignored()); + +// Verify that the cache token list is retrieved from the state handler +Mockito.verify(stateRequestHandler).getCacheTokens(); Review comment: We should be checking that the mock FnApiControlClient got a ProcessBundle InstructionRequest with the cache token specified and not verifying that the state request handler was invoked. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317746) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317744=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317744 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 18:39 Start Date: 24/Sep/19 18:39 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#discussion_r327769381 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java ## @@ -414,6 +432,21 @@ public static StateRequestHandler forBagUserStateHandlerFactory( } } +@Override +public Iterable getCacheTokens() { + return handlerCache.values().stream() Review comment: nit: [streams are siginficantly slower than for loops even with JDK 12](https://medium.com/@milan.mimica/slow-like-a-stream-fast-like-a-loop-524f70391182) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317744) Time Spent: 19h 50m (was: 19h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317561 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 14:58 Start Date: 24/Sep/19 14:58 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-534598857 Benchmark with a synthetic source with parallelism of 1 on my local machine: ```python max_items = 10 num_keys = 10 class GenerateInput(beam.DoFn): def process(self, byte_array): key_string = byte_array key_int = int(key_string) yield (key_int % num_keys, "value" + key_string) class StatefulProcessing(beam.DoFn): count_state_spec = userstate.CombiningValueStateSpec( 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) def process(self, kv, count=beam.DoFn.StateParam(count_state_spec)): k, v = kv count.add(len(v)) (p | "Generate data" >> FlinkStreamingImpulseSource().set_message_count(max_items).set_interval_ms(0).with_output_types(bytes) | "Reshuffle" >> beam.Reshuffle() | "Format Data" >> beam.ParDo(GenerateInput()).with_output_types(KV[int, str]) | "Stateful Processing" >> beam.ParDo(StatefulProcessing()) ) ``` ``` # elements: 100,000 # bundle size: 10 # number of unique keys: 10 Took 205.37934804 with experiments=state_cache_size=0 Took 129.933783054 with experiments=state_cache_size=10 ``` The version with caching is about 1,59 times faster. These results are consistent across multiple runs. This is of course a bit of an extreme situation in terms of the number of unique keys, but it shows the potential for the caching. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317561) Time Spent: 19.5h (was: 19h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317437 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 12:19 Start Date: 24/Sep/19 12:19 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r327580554 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = [] Review comment: I think caching only the first page makes sense. From looking at the Runner code, it looks like continuation tokens are not used currently. It seems like it is fair to defer this until they are used; also considering that the state caching is disabled by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317437) Time Spent: 19h 20m (was: 19h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=317432=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-317432 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 24/Sep/19 12:11 Start Date: 24/Sep/19 12:11 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r327577470 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -308,7 +311,8 @@ def __init__( default_environment=None, bundle_repeat=0, use_state_iterables=False, - provision_info=None): + provision_info=None, + state_cache_size=100): Review comment: Also added in in the code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 317432) Time Spent: 19h 10m (was: 19h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=316715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316715 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 23/Sep/19 14:29 Start Date: 23/Sep/19 14:29 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r327147182 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present Review comment: https://issues.apache.org/jira/browse/BEAM-8298 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316715) Time Spent: 19h (was: 18h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 19h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=316714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316714 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 23/Sep/19 14:29 Start Date: 23/Sep/19 14:29 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r327147001 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -308,7 +311,8 @@ def __init__( default_environment=None, bundle_repeat=0, use_state_iterables=False, - provision_info=None): + provision_info=None, + state_cache_size=100): Review comment: Jira: https://issues.apache.org/jira/browse/BEAM-8297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316714) Time Spent: 18h 50m (was: 18h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 18h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=316712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316712 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 23/Sep/19 14:27 Start Date: 23/Sep/19 14:27 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r327143887 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present Review comment: https://issues.apache.org/jira/browse/BEAM-8297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316712) Time Spent: 18h 40m (was: 18.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 18h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=316711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316711 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 23/Sep/19 14:24 Start Date: 23/Sep/19 14:24 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r327143887 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present Review comment: https://issues.apache.org/jira/browse/BEAM-8297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316711) Time Spent: 18.5h (was: 18h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Time Spent: 18.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=316030=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-316030 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 21/Sep/19 01:29 Start Date: 21/Sep/19 01:29 Worklog Time Spent: 10m Work Description: mxm commented on issue #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#issuecomment-527461152 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 316030) Time Spent: 18h 20m (was: 18h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 18h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307860 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 06/Sep/19 13:38 Start Date: 06/Sep/19 13:38 Worklog Time Spent: 10m Work Description: mxm commented on issue #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#issuecomment-528858139 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307860) Time Spent: 18h 10m (was: 18h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 18h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307679 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 06/Sep/19 07:41 Start Date: 06/Sep/19 07:41 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321611844 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -47,21 +47,26 @@ class BeamFnExternalWorkerPoolServicer( beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): - def __init__(self, worker_threads, use_process=False, - container_executable=None): + def __init__(self, worker_threads, + use_process=False, + container_executable=None, + state_cache_size=0): self._worker_threads = worker_threads self._use_process = use_process self._container_executable = container_executable +self._state_cache_size = state_cache_size self._worker_processes = {} @classmethod def start(cls, worker_threads=1, use_process=False, port=0, -container_executable=None): +state_cache_size=0, container_executable=None): worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) worker_address = 'localhost:%s' % worker_server.add_insecure_port( '[::]:%s' % port) -worker_pool = cls(worker_threads, use_process=use_process, - container_executable=container_executable) +worker_pool = cls(worker_threads, + use_process=use_process, + container_executable=container_executable, + state_cache_size=state_cache_size) Review comment: We need this here as well because this class is also used for testing. It will be overridden for the non-testing case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307679) Time Spent: 18h (was: 17h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 18h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307545=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307545 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 06/Sep/19 00:04 Start Date: 06/Sep/19 00:04 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321527154 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -149,6 +149,7 @@ def main(unused_argv): control_address=service_descriptor.url, worker_count=_get_worker_count(sdk_pipeline_options), worker_id=_worker_id, +state_cache_size=_get_state_cache_size(sdk_pipeline_options), Review comment: It might be better to pass the pipeline options to SDKHarness and have it extract what it needs. There is currently quite some (parameter related) noise wherever SdkHarness is constructed in general. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307545) Time Spent: 17h 50m (was: 17h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 17h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307536=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307536 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 23:43 Start Date: 05/Sep/19 23:43 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321527901 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -47,21 +47,26 @@ class BeamFnExternalWorkerPoolServicer( beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): - def __init__(self, worker_threads, use_process=False, - container_executable=None): + def __init__(self, worker_threads, + use_process=False, + container_executable=None, + state_cache_size=0): self._worker_threads = worker_threads self._use_process = use_process self._container_executable = container_executable +self._state_cache_size = state_cache_size self._worker_processes = {} @classmethod def start(cls, worker_threads=1, use_process=False, port=0, -container_executable=None): +state_cache_size=0, container_executable=None): worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) worker_address = 'localhost:%s' % worker_server.add_insecure_port( '[::]:%s' % port) -worker_pool = cls(worker_threads, use_process=use_process, - container_executable=container_executable) +worker_pool = cls(worker_threads, + use_process=use_process, + container_executable=container_executable, + state_cache_size=state_cache_size) Review comment: This should not occur here but come from the pipeline options from the provisioning endpoint. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307536) Time Spent: 17h 40m (was: 17.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 17h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307529=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307529 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 23:38 Start Date: 05/Sep/19 23:38 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321527154 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -149,6 +149,7 @@ def main(unused_argv): control_address=service_descriptor.url, worker_count=_get_worker_count(sdk_pipeline_options), worker_id=_worker_id, +state_cache_size=_get_state_cache_size(sdk_pipeline_options), Review comment: It might be better to pass the pipeline options to SDKHarness and have it extract what it needs. There is currently quite some noise wherever SdkHarness is constructed in general. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307529) Time Spent: 17.5h (was: 17h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 17.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307159 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 13:58 Start Date: 05/Sep/19 13:58 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321280737 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present Review comment: Will do! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307159) Time Spent: 16h 50m (was: 16h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307160=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307160 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 13:58 Start Date: 05/Sep/19 13:58 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321280766 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -331,7 +334,8 @@ def shutdown(self): class SdkWorker(object): - def __init__(self, bundle_processor_cache, profiler_factory=None): + def __init__(self, bundle_processor_cache, + profiler_factory=None): Review comment: Intended (separation of the kwargs from the positional args). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307160) Time Spent: 17h (was: 16h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 17h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307162=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307162 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 13:58 Start Date: 05/Sep/19 13:58 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321280832 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -51,7 +53,7 @@ class SdkHarness(object): def __init__( self, control_address, worker_count, credentials=None, worker_id=None, - profiler_factory=None): + state_cache_size=100, profiler_factory=None): Review comment: >Does this mean we can only cache 100 keys? Yes. >How about making the default 0 and have that turn the cross bundle caching off? Was thinking about this as well. Seems fair to default to 0 in the beginning but IMHO we should change the default to cached after experimentation. Would still keep caching enabled for the tests, except for the test suite which explicitly has caching disabled. >The cache size setting would need to be passed to the entry point as pipeline option You can read my mind. Just added this in another commit :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307162) Time Spent: 17h 20m (was: 17h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 17h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307161=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307161 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 13:58 Start Date: 05/Sep/19 13:58 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321280781 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -107,7 +110,7 @@ def run(self): # SdkHarness manage function registration and share self._fns with all # the workers. This is needed because function registration (register) # and exceution(process_bundle) are send over different request and we Review comment: ```suggestion # and execution (process_bundle) are send over different request and we ``` Also space missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307161) Time Spent: 17h 10m (was: 17h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 17h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=307093=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307093 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 11:50 Start Date: 05/Sep/19 11:50 Worklog Time Spent: 10m Work Description: mxm commented on issue #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#issuecomment-527809966 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 307093) Time Spent: 16h 40m (was: 16.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306889=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306889 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 04:33 Start Date: 05/Sep/19 04:33 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321070140 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -51,7 +53,7 @@ class SdkHarness(object): def __init__( self, control_address, worker_count, credentials=None, worker_id=None, - profiler_factory=None): + state_cache_size=100, profiler_factory=None): Review comment: The cache size setting would need to be passed to the entry point (`apache_beam.runners.worker.sdk_worker_main`) as pipeline option, so that in the end-to-end scenario the user can specify it. https://github.com/apache/beam/blob/5b43602109648830522159221a283754ed959afa/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L121 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306889) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306887=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306887 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 04:33 Start Date: 05/Sep/19 04:33 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321066586 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present Review comment: Create JIRA for it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306887) Time Spent: 16.5h (was: 16h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306888=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306888 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 04:33 Start Date: 05/Sep/19 04:33 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321066649 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -107,7 +110,7 @@ def run(self): # SdkHarness manage function registration and share self._fns with all # the workers. This is needed because function registration (register) # and exceution(process_bundle) are send over different request and we Review comment: "execution" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306888) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306885=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306885 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 04:33 Start Date: 05/Sep/19 04:33 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321066612 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -331,7 +334,8 @@ def shutdown(self): class SdkWorker(object): - def __init__(self, bundle_processor_cache, profiler_factory=None): + def __init__(self, bundle_processor_cache, + profiler_factory=None): Review comment: unintended change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306885) Time Spent: 16h 20m (was: 16h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306886=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306886 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 05/Sep/19 04:33 Start Date: 05/Sep/19 04:33 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r321068479 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -51,7 +53,7 @@ class SdkHarness(object): def __init__( self, control_address, worker_count, credentials=None, worker_id=None, - profiler_factory=None): + state_cache_size=100, profiler_factory=None): Review comment: Does this mean we can only cache 100 keys? That may be a bit little, but probably it is best left to the user. How about making the default 0 and have that turn the cross bundle caching off? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306886) Time Spent: 16h 20m (was: 16h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306698=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306698 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 20:23 Start Date: 04/Sep/19 20:23 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320957050 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = [] Review comment: The pagination is intended for when the value is too big to send, which probably means it's too big to cache (especially many of them given the current number-of-items bound). I don't think one would want to turn off all caching to avoid caching just these. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306698) Time Spent: 16h 10m (was: 16h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306694=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306694 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 20:20 Start Date: 04/Sep/19 20:20 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320956091 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -308,7 +311,8 @@ def __init__( default_environment=None, bundle_repeat=0, use_state_iterables=False, - provision_info=None): + provision_info=None, + state_cache_size=100): Review comment: Yeah, 100mb sounds like a reasonable value. It also makes more sense as a good default (who knows what 100 items is) and something possibly a user could specify. I don't think the memory model would have to be that sophisticated; the serialized size should be a reasonable approximation. It would require tracking it though. I would be OK with a JIRA and a TODO here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306694) Time Spent: 16h (was: 15h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 16h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306405=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306405 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:08 Start Date: 04/Sep/19 16:08 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320655683 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1660,6 +1671,12 @@ def process_bundle(self, inputs, expected_outputs): return result, split_results + @staticmethod + def _generate_cache_token(): +return beam_fn_api_pb2.ProcessBundleRequest.CacheToken( +user_state=beam_fn_api_pb2.ProcessBundleRequest.CacheToken.UserState(), +token=bytes(bytearray(random.getrandbits(8) for _ in range(16 Review comment: Have replaced this with a generator with a counter and a human-readable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306405) Time Spent: 15h 50m (was: 15h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 15h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306403=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306403 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320846325 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + """ + + def __init__(self, max_entries): +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token != cache_token: +# Discard cached state if tokens do not match +value = [] + for element in elements: +value.append(element) Review comment: Thanks, that's better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306403) Time Spent: 15.5h (was: 15h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 15.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306404=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306404 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320846356 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + """ + + def __init__(self, max_entries): +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token != cache_token: +# Discard cached state if tokens do not match +value = [] + for element in elements: +value.append(element) + self._cache.put(state_key, (cache_token, value)) + + def clear(self, state_key): +assert self.is_cache_enabled() +with self._lock: + self._cache.clear(state_key) Review comment: Correct, this would invalidate the cache which results in an unnecessary retrieval of the state from the Runner. We should instead just clear the value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306404) Time Spent: 15h 40m (was: 15.5h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 15h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306400 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320845993 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = [] Review comment: For the cache to work effectively, we want to cache the whole value. Note that if the cache is disabled, we still retain the old behavior of only materializing a page at a time. Yes, it has the drawback of the value to fit into memory. The design currently doesn't do memory estimation but keeps a fixed upper limit of items. If we move to a memory estimation, then also reflecting this here would make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306400) Time Spent: 15h (was: 14h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 15h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306402 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320846304 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + """ + + def __init__(self, max_entries): +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token != cache_token: +# Discard cached state if tokens do not match +value = [] Review comment: Good catch. Have change it to expunge the cache in this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306402) Time Spent: 15h 20m (was: 15h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 15h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306401=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306401 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320846279 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -308,7 +311,8 @@ def __init__( default_environment=None, bundle_repeat=0, use_state_iterables=False, - provision_info=None): + provision_info=None, + state_cache_size=100): Review comment: This is the maximum number of items to hold in the cache. I agree it would be nice to have this size-based. Though this will also require a more sophisticated memory model to determine the correct memory size. We could go with something reasonable, e.g. 100mb or so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306401) Time Spent: 15h 10m (was: 15h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 15h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306398 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320845947 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = [] + for val in self._materialize_iter(state_key, coder): +materialized.append(val) + self._state_cache.append( + cache_state_key, + self._context.cache_token, + materialized) +return iter(cached_value) + + def append(self, state_key, coder, elements, is_cached=False): +if self._should_be_cached(is_cached): + # Update the cache + cache_key = self._convert_to_cache_key(state_key) + self._state_cache.append(cache_key, self._context.cache_token, elements) +# Write to state handler +out = coder_impl.create_OutputStream() +for element in elements: + coder.encode_to_stream(element, out, True) +return self._underlying.append_raw(state_key, out.get()) + + def clear(self, state_key, is_cached=False): +if self._should_be_cached(is_cached): + self._state_cache.clear(self._convert_to_cache_key(state_key)) +return self._underlying.clear(state_key) + + # The following methods are for interaction with the FnApiRunner: + + def get_raw(self, state_key, continuation_token=None): +return self._underlying.get_raw(state_key, continuation_token) + + def append_raw(self, state_key, data): +return self._underlying.append_raw(state_key, data) + + def restore(self): +self._underlying.restore() + + def checkpoint(self): +self._underlying.checkpoint() + + def done(self): +self._underlying.done() + + def _materialize_iter(self, state_key, coder): +continuation_token = None +while True: + data, continuation_token = \ + self._underlying.get_raw(state_key, continuation_token) + input_stream = coder_impl.create_InputStream(data) + while input_stream.size() > 0: +yield coder.decode_from_stream(input_stream, True) + if not continuation_token: +break + + def _should_be_cached(self, request_is_cached): +return self._state_cache.is_cache_enabled() and \ + request_is_cached and \ Review comment: Changed, but kept it for ```python data, continuation_token = \ self._underlying.get_raw(state_key, continuation_token) ``` because ```python data, continuation_token = ( self._underlying.get_raw(state_key, continuation_token)) ``` looks harder to understand. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306399=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306399 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:07 Start Date: 04/Sep/19 16:07 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320845968 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = [] + for val in self._materialize_iter(state_key, coder): Review comment: Thanks, much more concise. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306399) Time Spent: 14h 50m (was: 14h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 14h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306396 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:06 Start Date: 04/Sep/19 16:06 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320845900 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1660,6 +1671,12 @@ def process_bundle(self, inputs, expected_outputs): return result, split_results + @staticmethod + def _generate_cache_token(): +return beam_fn_api_pb2.ProcessBundleRequest.CacheToken( +user_state=beam_fn_api_pb2.ProcessBundleRequest.CacheToken.UserState(), +token=bytes(bytearray(random.getrandbits(8) for _ in range(16 Review comment: Have replaced this with a generator with a counter and a human-readable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306396) Time Spent: 14h 20m (was: 14h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 14h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306395=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306395 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:06 Start Date: 04/Sep/19 16:06 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320655683 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1660,6 +1671,12 @@ def process_bundle(self, inputs, expected_outputs): return result, split_results + @staticmethod + def _generate_cache_token(): +return beam_fn_api_pb2.ProcessBundleRequest.CacheToken( +user_state=beam_fn_api_pb2.ProcessBundleRequest.CacheToken.UserState(), +token=bytes(bytearray(random.getrandbits(8) for _ in range(16 Review comment: Have replaced this with a generator with a counter and a human-readable name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306395) Time Spent: 14h 20m (was: 14h 10m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 14h 20m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306397 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 16:06 Start Date: 04/Sep/19 16:06 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320845931 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1622,7 +1632,8 @@ def process_bundle(self, inputs, expected_outputs): process_bundle_req = beam_fn_api_pb2.InstructionRequest( instruction_id=process_bundle_id, process_bundle=beam_fn_api_pb2.ProcessBundleRequest( -process_bundle_descriptor_reference=self._bundle_descriptor.id)) +process_bundle_descriptor_reference=self._bundle_descriptor.id, +cache_tokens=[self._generate_cache_token()])) Review comment: Correct. The cache token should be regenerated per bundle repeat only. Have pushed an update. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306397) Time Spent: 14.5h (was: 14h 20m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 14.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=306199=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-306199 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 08:57 Start Date: 04/Sep/19 08:57 Worklog Time Spent: 10m Work Description: mxm commented on issue #9374: [BEAM-5428] Implement Runner support for cache tokens URL: https://github.com/apache/beam/pull/9374#issuecomment-527809966 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 306199) Time Spent: 14h 10m (was: 14h) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 14h 10m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=305985=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305985 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 00:20 Start Date: 04/Sep/19 00:20 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320526521 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker.py ## @@ -620,6 +628,106 @@ def _next_id(self): return str(self._last_id) +class CachingMaterializingStateHandler(object): + """ A State handler which retrieves and caches state. """ + + def __init__(self, global_state_cache, underlying_state): +self._underlying = underlying_state +self._state_cache = global_state_cache +self._context = threading.local() + + @contextlib.contextmanager + def process_instruction_id(self, bundle_id, cache_tokens): +if getattr(self._context, 'cache_token', None) is not None: + raise RuntimeError( + 'Cache tokens already set to %s' % self._context.cache_token) +# TODO Also handle cache tokens for side input, if present +user_state_cache_token = None +for cache_token_struct in cache_tokens: + if cache_token_struct.HasField("user_state"): +# There should only be one user state token present +assert not user_state_cache_token +user_state_cache_token = cache_token_struct.token +try: + self._context.cache_token = user_state_cache_token + with self._underlying.process_instruction_id(bundle_id): +yield +finally: + self._context.cache_token = None + + def blocking_get(self, state_key, coder, is_cached=False): +if not self._should_be_cached(is_cached): + # no cache / tokens, can't do a lookup/store in the cache + return self._materialize_iter(state_key, coder) +# Cache lookup +cache_state_key = self._convert_to_cache_key(state_key) +cached_value = self._state_cache.get(cache_state_key, + self._context.cache_token) +if cached_value is None: + # Cache miss, need to retrieve from the Runner + materialized = cached_value = [] + for val in self._materialize_iter(state_key, coder): Review comment: This is equivalent to `materialized = cached_value = list(self._materialize_iter(state_key, coder))` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305985) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 13h 40m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=305983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305983 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 00:20 Start Date: 04/Sep/19 00:20 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320528334 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + """ + + def __init__(self, max_entries): +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token != cache_token: +# Discard cached state if tokens do not match +value = [] + for element in elements: +value.append(element) Review comment: lists have an extend method that could be used here rather than a looping append. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305983) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 13.5h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=305986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305986 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 00:20 Start Date: 04/Sep/19 00:20 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320527496 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -308,7 +311,8 @@ def __init__( default_environment=None, bundle_repeat=0, use_state_iterables=False, - provision_info=None): + provision_info=None, + state_cache_size=100): Review comment: Is this number of items? It'd probably be better for this to be sensitive to the size of the items (e.g. estimated by the serialized size we get when making state requests). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305986) Time Spent: 13h 50m (was: 13h 40m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 13h 50m > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5428) Implement cross-bundle state caching.
[ https://issues.apache.org/jira/browse/BEAM-5428?focusedWorklogId=305990=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305990 ] ASF GitHub Bot logged work on BEAM-5428: Author: ASF GitHub Bot Created on: 04/Sep/19 00:20 Start Date: 04/Sep/19 00:20 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #9418: [BEAM-5428] Implement cross-bundle user state caching in the Python SDK URL: https://github.com/apache/beam/pull/9418#discussion_r320528550 ## File path: sdks/python/apache_beam/runners/worker/statecache.py ## @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A module for caching state reads/writes in Beam applications.""" +from __future__ import absolute_import + +import collections +from threading import Lock + + +class StateCache(object): + """ Cache for Beam state access, scoped by state key and cache_token. + + For a given state_key, caches a (cache_token, value) tuple and allows to +a) read from the cache, + if the currently stored cache_token matches the provided +a) write to the cache, + storing the new value alongside with a cache token +c) append to the cache cache, + if the currently stored cache_token matches the provided + + The operations on the cache are thread-safe for use by multiple workers. + """ + + def __init__(self, max_entries): +self._cache = self.LRUCache(max_entries, (None, None)) +self._lock = Lock() + + def get(self, state_key, cache_token): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) +return value if token == cache_token else None + + def put(self, state_key, cache_token, value): +assert cache_token and self.is_cache_enabled() +with self._lock: + return self._cache.put(state_key, (cache_token, value)) + + def append(self, state_key, cache_token, elements): +assert cache_token and self.is_cache_enabled() +with self._lock: + token, value = self._cache.get(state_key) + if token != cache_token: +# Discard cached state if tokens do not match +value = [] + for element in elements: +value.append(element) + self._cache.put(state_key, (cache_token, value)) + + def clear(self, state_key): +assert self.is_cache_enabled() +with self._lock: + self._cache.clear(state_key) Review comment: Clearing an entry from the cache is different than setting the value as cleared (but still being able to look it up). We want the latter, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305990) Time Spent: 14h (was: 13h 50m) > Implement cross-bundle state caching. > - > > Key: BEAM-5428 > URL: https://issues.apache.org/jira/browse/BEAM-5428 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Rakesh Kumar >Priority: Major > Time Spent: 14h > Remaining Estimate: 0h > > Tech spec: > [https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m] > Relevant document: > [https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit#|https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/edit] > Mailing list link: > [https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.2#803003)