This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 534015c0b8e [Typescript] creating dataflow job template fix (#29928)
534015c0b8e is described below

commit 534015c0b8e6b9817ec054c4fe67b3bab423f4a6
Author: Cheskel Twersky <twerskyches...@gmail.com>
AuthorDate: Wed Jan 17 01:22:36 2024 +0200

    [Typescript] creating dataflow job template fix (#29928)
    
    * set template completion status to DONE
    
    * job_servicer is not used in serve
---
 sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py      | 3 +--
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py           | 3 ++-
 sdks/python/apache_beam/runners/portability/local_job_service_main.py | 4 ++--
 sdks/python/apache_beam/runners/render.py                             | 3 +--
 4 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py
index 2bd8605c992..710c71273e3 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_job_service.py
@@ -74,8 +74,7 @@ def run(argv, beam_job_type=DataflowBeamJob):
       options.staging_dir, beam_job_type=beam_job_type)
   port = job_servicer.start_grpc_server(options.port)
   try:
-    local_job_service_main.serve(
-        "Listening for beam jobs on port %d." % port, job_servicer)
+    local_job_service_main.serve("Listening for beam jobs on port %d." % port)
   finally:
     job_servicer.stop()
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index dc315119e48..db6a5235ac9 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -731,7 +731,8 @@ class DataflowPipelineResult(PipelineResult):
       A PipelineState object.
     """
     if not self.has_job:
-      return PipelineState.UNKNOWN
+      # 
https://github.com/apache/beam/blob/8f71dc41b30a978095ca0e0699009e4f4445a618/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L867-L870
+      return PipelineState.DONE
 
     self._update_job()
 
diff --git 
a/sdks/python/apache_beam/runners/portability/local_job_service_main.py 
b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
index 6d9d32f5f23..efc198e87db 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service_main.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service_main.py
@@ -145,7 +145,7 @@ def run(argv):
         with open(options.port_file + '.tmp', 'w') as fout:
           fout.write(str(port))
         os.rename(options.port_file + '.tmp', options.port_file)
-      serve("Listening for beam jobs on port %d." % port, job_servicer)
+      serve("Listening for beam jobs on port %d." % port)
     finally:
       job_servicer.stop()
   finally:
@@ -155,7 +155,7 @@ def run(argv):
       os.unlink(options.port_file)
 
 
-def serve(msg, job_servicer):
+def serve(msg):
   logging_delay = 30
   while True:
     _LOGGER.info(msg)
diff --git a/sdks/python/apache_beam/runners/render.py 
b/sdks/python/apache_beam/runners/render.py
index da153d25a4b..fccfa8aacd6 100644
--- a/sdks/python/apache_beam/runners/render.py
+++ b/sdks/python/apache_beam/runners/render.py
@@ -559,8 +559,7 @@ def run_server(options):
         staging_dir, beam_job_type=RenderBeamJob)
     port = job_servicer.start_grpc_server(options.job_port)
     try:
-      local_job_service_main.serve(
-          "Listening for beam jobs on port %d." % port, job_servicer)
+      local_job_service_main.serve("Listening for beam jobs on port %d." % 
port)
     finally:
       job_servicer.stop()
 

Reply via email to