This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 534015c0b8e [Typescript] creating dataflow job template fix (#29928) 534015c0b8e is described below commit 534015c0b8e6b9817ec054c4fe67b3bab423f4a6 Author: Cheskel Twersky <twerskyches...@gmail.com> AuthorDate: Wed Jan 17 01:22:36 2024 +0200 [Typescript] creating dataflow job template fix (#29928) * set template completion status to DONE * job_servicer is not used in serve --- sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py | 3 +-- sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 3 ++- sdks/python/apache_beam/runners/portability/local_job_service_main.py | 4 ++-- sdks/python/apache_beam/runners/render.py | 3 +-- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py b/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py index 2bd8605c992..710c71273e3 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py @@ -74,8 +74,7 @@ def run(argv, beam_job_type=DataflowBeamJob): options.staging_dir, beam_job_type=beam_job_type) port = job_servicer.start_grpc_server(options.port) try: - local_job_service_main.serve( - "Listening for beam jobs on port %d." % port, job_servicer) + local_job_service_main.serve("Listening for beam jobs on port %d." % port) finally: job_servicer.stop() diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index dc315119e48..db6a5235ac9 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -731,7 +731,8 @@ class DataflowPipelineResult(PipelineResult): A PipelineState object. """ if not self.has_job: - return PipelineState.UNKNOWN + # https://github.com/apache/beam/blob/8f71dc41b30a978095ca0e0699009e4f4445a618/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L867-L870 + return PipelineState.DONE self._update_job() diff --git a/sdks/python/apache_beam/runners/portability/local_job_service_main.py b/sdks/python/apache_beam/runners/portability/local_job_service_main.py index 6d9d32f5f23..efc198e87db 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service_main.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py @@ -145,7 +145,7 @@ def run(argv): with open(options.port_file + '.tmp', 'w') as fout: fout.write(str(port)) os.rename(options.port_file + '.tmp', options.port_file) - serve("Listening for beam jobs on port %d." % port, job_servicer) + serve("Listening for beam jobs on port %d." % port) finally: job_servicer.stop() finally: @@ -155,7 +155,7 @@ def run(argv): os.unlink(options.port_file) -def serve(msg, job_servicer): +def serve(msg): logging_delay = 30 while True: _LOGGER.info(msg) diff --git a/sdks/python/apache_beam/runners/render.py b/sdks/python/apache_beam/runners/render.py index da153d25a4b..fccfa8aacd6 100644 --- a/sdks/python/apache_beam/runners/render.py +++ b/sdks/python/apache_beam/runners/render.py @@ -559,8 +559,7 @@ def run_server(options): staging_dir, beam_job_type=RenderBeamJob) port = job_servicer.start_grpc_server(options.job_port) try: - local_job_service_main.serve( - "Listening for beam jobs on port %d." % port, job_servicer) + local_job_service_main.serve("Listening for beam jobs on port %d." % port) finally: job_servicer.stop()