[beam] branch master updated: [BEAM-10321] retain environments in flatten for preventing it from being fused into stages running in foreign language SDKs

2020-06-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 09258f2  [BEAM-10321] retain environments in flatten for preventing it 
from being fused into stages running in foreign language SDKs
 new 1805c66  Merge pull request #12087 from [BEAM-10321] retain 
environments in flatten for preventing it from be…
09258f2 is described below

commit 09258f27b83f7034856a0231de3d9188ffa8bd6d
Author: Heejong Lee 
AuthorDate: Wed Jun 24 18:31:41 2020 -0700

[BEAM-10321] retain environments in flatten for preventing it from being 
fused into stages running in foreign language SDKs
---
 .../apache_beam/runners/portability/fn_api_runner/translations.py   | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index eca8494..34dcf48 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -1141,7 +1141,8 @@ def sink_flattens(stages, pipeline_context):
 inputs={local_in: pcoll_in},
 spec=beam_runner_api_pb2.FunctionSpec(
 urn=bundle_processor.DATA_OUTPUT_URN,
-payload=buffer_id))
+payload=buffer_id),
+environment_id=transform.environment_id)
 ],
 downstream_side_inputs=frozenset(),
 must_follow=stage.must_follow)
@@ -1155,7 +1156,8 @@ def sink_flattens(stages, pipeline_context):
   unique_name=transform.unique_name + '/Read',
   outputs=transform.outputs,
   spec=beam_runner_api_pb2.FunctionSpec(
-  urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id))
+  urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id),
+  environment_id=transform.environment_id)
   ],
   downstream_side_inputs=stage.downstream_side_inputs,
   must_follow=union(frozenset(flatten_writes), stage.must_follow))



[beam] branch master updated: [BEAM-10321] retain environments in flatten for preventing it from being fused into stages running in foreign language SDKs

2020-06-30 Thread pabloem
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
 new 09258f2  [BEAM-10321] retain environments in flatten for preventing it 
from being fused into stages running in foreign language SDKs
 new 1805c66  Merge pull request #12087 from [BEAM-10321] retain 
environments in flatten for preventing it from be…
09258f2 is described below

commit 09258f27b83f7034856a0231de3d9188ffa8bd6d
Author: Heejong Lee 
AuthorDate: Wed Jun 24 18:31:41 2020 -0700

[BEAM-10321] retain environments in flatten for preventing it from being 
fused into stages running in foreign language SDKs
---
 .../apache_beam/runners/portability/fn_api_runner/translations.py   | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index eca8494..34dcf48 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -1141,7 +1141,8 @@ def sink_flattens(stages, pipeline_context):
 inputs={local_in: pcoll_in},
 spec=beam_runner_api_pb2.FunctionSpec(
 urn=bundle_processor.DATA_OUTPUT_URN,
-payload=buffer_id))
+payload=buffer_id),
+environment_id=transform.environment_id)
 ],
 downstream_side_inputs=frozenset(),
 must_follow=stage.must_follow)
@@ -1155,7 +1156,8 @@ def sink_flattens(stages, pipeline_context):
   unique_name=transform.unique_name + '/Read',
   outputs=transform.outputs,
   spec=beam_runner_api_pb2.FunctionSpec(
-  urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id))
+  urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id),
+  environment_id=transform.environment_id)
   ],
   downstream_side_inputs=stage.downstream_side_inputs,
   must_follow=union(frozenset(flatten_writes), stage.must_follow))