[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=308392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308392
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 07/Sep/19 21:51
Start Date: 07/Sep/19 21:51
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #9398: [BEAM-7980] Exactly 
once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#issuecomment-529149693
 
 
   LGTM after the fact. Good use of the first class function for the lock 
wrapping.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 308392)
Time Spent: 9h  (was: 8h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 2.16.0
>
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305727=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305727
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 03/Sep/19 17:44
Start Date: 03/Sep/19 17:44
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 305727)
Time Spent: 8h 50m  (was: 8h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305666
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 03/Sep/19 15:58
Start Date: 03/Sep/19 15:58
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #9398: [BEAM-7980] Exactly 
once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#issuecomment-527523050
 
 
   New changes LGTM.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 305666)
Time Spent: 8h 40m  (was: 8.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305487
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 03/Sep/19 09:22
Start Date: 03/Sep/19 09:22
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9398: [BEAM-7980] Exactly once 
artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#issuecomment-527378537
 
 
   I think this can be squashed and merged. @lostluck's comments seem to be 
resolved.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 305487)
Time Spent: 8.5h  (was: 8h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305394=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305394
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 03/Sep/19 04:10
Start Date: 03/Sep/19 04:10
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9398: [BEAM-7980] Exactly 
once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#issuecomment-527296613
 
 
   @lostluck @aaltay @mxm PTAL since I made a few more changes than those 
requested to improve the synchronization logic. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 305394)
Time Spent: 8h 20m  (was: 8h 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=304999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304999
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 01/Sep/19 17:43
Start Date: 01/Sep/19 17:43
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r319768879
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
+   log.Fatalf("Failed to install required packages: %v", 
setupErr)
+   }
+
+   // mark install complete
+   os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
+
+}()
 
// (3) Invoke python
 
 
 Review comment:
   They don't have to be same on Flink, but the staged artifacts have to be 
identical.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 304999)
Time Spent: 8h 10m  (was: 8h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-09-01 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=304988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304988
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 01/Sep/19 17:03
Start Date: 01/Sep/19 17:03
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r319767728
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
 Review comment:
   Done! Can this be accomplished through the gradle plugin also (w/o extra go 
install)? Would be nice to have this integrated like Spotless for Java.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 304988)
Time Spent: 8h  (was: 7h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=301119=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-301119
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 26/Aug/19 09:26
Start Date: 26/Aug/19 09:26
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r317522264
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   Is this check really necessary? In case of a race condition when multiple 
workers complete this section before the install file has been created, only 
the later check will catch that. For simplicity, I'd remove this check 
entirely, unless the locking is very expensive.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 301119)
Time Spent: 7h 50m  (was: 7h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=301120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-301120
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 26/Aug/19 09:26
Start Date: 26/Aug/19 09:26
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r317522603
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   Makes sense. I'm new to Go. However, if you look at the above similar check 
(line 115), there is a possible race condition which can only be caught here 
where the lock is in place.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 301120)
Time Spent: 7h 50m  (was: 7h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300511
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 23/Aug/19 21:07
Start Date: 23/Aug/19 21:07
Worklog Time Spent: 10m 
  Work Description: lostluck commented on issue #9398: [BEAM-7980] Exactly 
once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#issuecomment-524462613
 
 
   FYI I'm gone until Sept 3rd, please don't block on unnecessarily on further 
feed back from me.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 300511)
Time Spent: 7h 40m  (was: 7.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300500=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300500
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 23/Aug/19 20:46
Start Date: 23/Aug/19 20:46
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r317295950
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   There is no need to enter the locked section when the install was already 
completed (hence the comment above).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 300500)
Time Spent: 7.5h  (was: 7h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300487
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 23/Aug/19 20:22
Start Date: 23/Aug/19 20:22
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r31726
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   This is within the lock section. defer ... above executes after the 
completion of this function.
   
   I think we should block all workers as intended here, because workers will 
need the environment to be setup first.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 300487)
Time Spent: 7h 20m  (was: 7h 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300154=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300154
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 23/Aug/19 08:23
Start Date: 23/Aug/19 08:23
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r317022783
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   Do you want to avoid blocking the workers?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 300154)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300153=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300153
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 23/Aug/19 08:23
Start Date: 23/Aug/19 08:23
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r317022471
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   Why is this not within the locked section?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 300153)
Time Spent: 7h 10m  (was: 7h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300152=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300152
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 23/Aug/19 08:23
Start Date: 23/Aug/19 08:23
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r317022289
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
 
 Review comment:
   I think this should live within the locked section.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 300152)
Time Spent: 7h 10m  (was: 7h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299639=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299639
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:37
Start Date: 22/Aug/19 18:37
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316827877
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
+   log.Fatalf("Failed to install required packages: %v", 
setupErr)
+   }
+
+   // mark install complete
+   os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
+
+}()
 
// (3) Invoke python
 
 
 Review comment:
   Is that to mean that the loggingEndpoint and controlEndpoints are the same?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299639)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299638
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:37
Start Date: 22/Aug/19 18:37
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316825518
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
 
 Review comment:
   Nit: This is fine as is, but here's an optional suggestion.
   In this instance the lockfile package author has a type TemporaryError to 
indicate the error can be retried,which we can check for instead of listing all 
temporary errors.
   
   if _, ok := err.(lockfile.TemporaryError); ok {
 //sleep etc
   } else {
 log.Fatalf(...
   }
   
   Or if you're keen on a switch, how about a type switch?
   switch err.(type) {
   case lockfile.TemporaryError:
 //sleep etc
   default:
 log.Fatalf(...
   }
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299638)
Time Spent: 7h  (was: 6h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299625=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299625
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:20
Start Date: 22/Aug/19 18:20
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316818691
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -33,6 +34,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
+   "github.com/nightlyone/lockfile"
 
 Review comment:
   About the same way as any other package: looking at the code. There's no 
good third party vetting system for Go at the moment.
   
   As for this one, it's a small package, (~200 lines), and only imports from 
the standard library, making it relatively safe, and easy to inspect.
   https://github.com/nightlyone/lockfile/blob/master/lockfile.go
   
   You can also get a view of other open source imported from godoc.org
   https://godoc.org/github.com/nightlyone/lockfile?importers
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299625)
Time Spent: 6h 50m  (was: 6h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299624=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299624
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:18
Start Date: 22/Aug/19 18:18
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316820565
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
 Review comment:
   The indentation here is wonky (it's using spaces instead of tabs). Please 
run go fmt on this file.
   https://blog.golang.org/go-fmt-your-code
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299624)
Time Spent: 6h 40m  (was: 6.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299622=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299622
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:17
Start Date: 22/Aug/19 18:17
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316819996
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
 
 Review comment:
   If it's only used in one place, this is fine as is. It would not be wrong to 
move the path definition elsewhere, but it would be indirect.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299622)
Time Spent: 6.5h  (was: 6h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299621
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:14
Start Date: 22/Aug/19 18:14
Worklog Time Spent: 10m 
  Work Description: lostluck commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316818691
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -33,6 +34,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
+   "github.com/nightlyone/lockfile"
 
 Review comment:
   About the same way as any other package: looking at the code. There's no 
good third party vetting system for Go at the moment.
   
   As for this one, it's a small package, (~200 lines), and only imports from 
the standard library, making it relatively safe, and easy to inspect.
   https://github.com/nightlyone/lockfile/blob/master/lockfile.go
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299621)
Time Spent: 6h 20m  (was: 6h 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299615
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 18:02
Start Date: 22/Aug/19 18:02
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9398: [BEAM-7980] Exactly 
once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#issuecomment-524013766
 
 
   @aaltay thanks for taking a look, is there any other Go expert that can be 
tagged here?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299615)
Time Spent: 6h 10m  (was: 6h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299572=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299572
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 16:59
Start Date: 22/Aug/19 16:59
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316781201
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
 
 Review comment:
   Could we actually separate the artifact/install logic L144-L155 into a 
function from the locking related logic.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299572)
Time Spent: 5h 50m  (was: 5h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299573=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299573
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 16:59
Start Date: 22/Aug/19 16:59
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316786467
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -33,6 +34,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
+   "github.com/nightlyone/lockfile"
 
 Review comment:
   For my learning, how does one pick/vet right go packages?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299573)
Time Spent: 6h  (was: 5h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299574=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299574
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 16:59
Start Date: 22/Aug/19 16:59
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316782950
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
 
 Review comment:
   Should we separate `filepath.Join(os.TempDir(), "beam.install.lck")` to a 
variable similar to `installCompleteFile`? Not sure what is the go convention 
if it is being used only in one place.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299574)
Time Spent: 6h  (was: 5h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299575=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299575
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 16:59
Start Date: 22/Aug/19 16:59
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316783735
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
+   log.Fatalf("Failed to install required packages: %v", 
setupErr)
+   }
+
+   // mark install complete
+   os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666)
+
+}()
 
// (3) Invoke python
 
 
 Review comment:
   Related to env variables below. We need to make sure that no worker receives 
different endpoint. (This is a non-issue today.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299575)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299132
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:28
Start Date: 22/Aug/19 03:28
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316483610
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
 
 Review comment:
   In the future we could look into inverting this logic by extracting the 
exactly-once logic into a separate function that can be shared between the SDK 
containers.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299132)
Time Spent: 5.5h  (was: 5h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299134=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299134
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:28
Start Date: 22/Aug/19 03:28
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398#discussion_r316483610
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -105,18 +107,57 @@ func main() {
 
// (2) Retrieve and install the staged packages.
 
-   dir := filepath.Join(*semiPersistDir, "staged")
+   func() {
 
-   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
-   if err != nil {
-   log.Fatalf("Failed to retrieve staged files: %v", err)
-   }
+installCompleteFile := filepath.Join(os.TempDir(), 
"beam.install.complete")
 
-   // TODO(herohde): the packages to install should be specified 
explicitly. It
-   // would also be possible to install the SDK in the Dockerfile.
-   if setupErr := installSetupPackages(files, dir); setupErr != nil {
-   log.Fatalf("Failed to install required packages: %v", setupErr)
-   }
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   // lock to guard from concurrent artifact retrieval and 
installation,
+   // when called by child processes in a worker pool
+   lock, err := lockfile.New(filepath.Join(os.TempDir(), 
"beam.install.lck"))
+   if err != nil {
+   log.Fatalf("Cannot init artifact retrieval lock: %v", 
err)
+   }
+
+   for err = lock.TryLock(); err != nil; err = lock.TryLock() {
+   switch err {
+   case lockfile.ErrBusy, lockfile.ErrNotExist:
+   time.Sleep(5 * time.Second)
+   log.Printf("Worker %v waiting for artifact 
retrieval lock: %v", *id, lock)
+   default:
+   log.Fatalf("Worker %v could not obtain artifact 
retrieval lock: %v", *id, err)
+   }
+   }
+   defer lock.Unlock()
+
+   // skip if install already complete
+   _, err = os.Stat(installCompleteFile)
+   if err == nil {
+   return
+   }
+
+   dir := filepath.Join(*semiPersistDir, "staged")
+
+   files, err := artifact.Materialize(ctx, *artifactEndpoint, 
info.GetRetrievalToken(), dir)
+   if err != nil {
+   log.Fatalf("Failed to retrieve staged files: %v", err)
+   }
+
+   // TODO(herohde): the packages to install should be specified 
explicitly. It
+   // would also be possible to install the SDK in the Dockerfile.
+   if setupErr := installSetupPackages(files, dir); setupErr != 
nil {
 
 Review comment:
   In the future we could look into inverting this logic by extracting the 
exactly-once part into a separate function that can be shared between the SDK 
containers.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299134)
Time Spent: 5h 40m  (was: 5.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299131=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299131
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 22/Aug/19 03:26
Start Date: 22/Aug/19 03:26
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9398: [BEAM-7980] 
Exactly once artifact retrieval for Python SDK worker pool
URL: https://github.com/apache/beam/pull/9398
 
 
   This is the follow-up for exactly once artifact retrieval. 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299048=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299048
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 22:31
Start Date: 21/Aug/19 22:31
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299048)
Time Spent: 5h 10m  (was: 5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299036=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299036
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:46
Start Date: 21/Aug/19 21:46
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523662128
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299036)
Time Spent: 5h  (was: 4h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299034=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299034
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:40
Start Date: 21/Aug/19 21:40
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523660441
 
 
   Run Portable_Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299034)
Time Spent: 4h 50m  (was: 4h 40m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299032=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299032
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 21:32
Start Date: 21/Aug/19 21:32
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523657737
 
 
   Run Portable_Python PreCommit
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299032)
Time Spent: 4h 40m  (was: 4.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299002
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:50
Start Date: 21/Aug/19 20:50
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316396047
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+The worker pool uses child processes for parallelism; threads are
 
 Review comment:
   The documentation applies to the entry point, which always uses processes. 
It's OK if tests use the inner class separately, it should have its own 
documentation (I did not modify that part, just moved it).
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 299002)
Time Spent: 4.5h  (was: 4h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298982
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:22
Start Date: 21/Aug/19 20:22
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316384687
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+The worker pool uses child processes for parallelism; threads are
 
 Review comment:
   It is a bit odd to read this here because `BeamFnExternalWorkerPoolServicer` 
supports threads internally and tests make use of this feature. If this module 
is the entry point for the worker pool, maybe the pooling code itself should be 
located in a different module and we should only handle its configuration here.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298982)
Time Spent: 4h 20m  (was: 4h 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298973
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:12
Start Date: 21/Aug/19 20:12
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316380746
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+This entry point is used by the Python SDK container in worker pool mode.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+
+# Register to kill the subprocesses on exit.
+def kill_worker_processes():
+  for worker_process in cls._worker_processes.values():
+worker_process.kill()
+atexit.register(kill_worker_processes)
+
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
 
 Review comment:
   Makes sense. Thank you for dcoumenting.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298973)
Time Spent: 4h 10m  (was: 4h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298974
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 20:12
Start Date: 21/Aug/19 20:12
Worklog Time Spent: 10m 
  Work Description: aaltay commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316380301
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -59,6 +60,18 @@ const (
 
 func main() {
flag.Parse()
+
+   if *workerPool == true {
+   args := []string{
+   "-m",
+   "apache_beam.runners.worker.worker_pool_main",
+   "--service_port=5",
+   "--container_executable=/opt/apache/beam/boot",
+   }
+   log.Printf("Starting Python SDK worker pool: python %v", 
strings.Join(args, " "))
+   log.Fatalf("Python SDK worker pool exited: %v", 
execx.Execute("python", args...))
 
 Review comment:
   Sounds good. Do you mind adding a TODO here for that follow up PR.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298974)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298909
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:03
Start Date: 21/Aug/19 19:03
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316353394
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298900=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298900
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:00
Start Date: 21/Aug/19 19:00
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [BEAM-7980] External 
environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-523604353
 
 
   @mxm @aaltay PTAL
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298900)
Time Spent: 3h 40m  (was: 3.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298902=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298902
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:00
Start Date: 21/Aug/19 19:00
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316352545
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298899
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 19:00
Start Date: 21/Aug/19 19:00
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316351902
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -59,6 +60,18 @@ const (
 
 func main() {
flag.Parse()
+
+   if *workerPool == true {
+   args := []string{
+   "-m",
+   "apache_beam.runners.worker.worker_pool_main",
+   "--service_port=5",
+   "--container_executable=/opt/apache/beam/boot",
+   }
+   log.Printf("Starting Python SDK worker pool: python %v", 
strings.Join(args, " "))
+   log.Fatalf("Python SDK worker pool exited: %v", 
execx.Execute("python", args...))
 
 Review comment:
   I'm working on the locking that prevents multiple processes from downloading 
artifacts and running installation concurrently, since this needs to be done 
only once. I would prefer to defer this to a follow-up PR though.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298899)
Time Spent: 3.5h  (was: 3h 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298861
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 17:03
Start Date: 21/Aug/19 17:03
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316296507
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+This entry point is used by the Python SDK container in worker pool mode.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+
+# Register to kill the subprocesses on exit.
+def kill_worker_processes():
+  for worker_process in cls._worker_processes.values():
+worker_process.kill()
+atexit.register(kill_worker_processes)
+
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
 
 Review comment:
   Example how the worker pool is started: `docker run --rm -p=5:5 
tweise-docker-apache.bintray.io/beam/python:latest --worker_pool=true`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298861)
Time Spent: 3h 20m  (was: 3h 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298851
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 16:52
Start Date: 21/Aug/19 16:52
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316291805
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+This entry point is used by the Python SDK container in worker pool mode.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+
+# Register to kill the subprocesses on exit.
+def kill_worker_processes():
+  for worker_process in cls._worker_processes.values():
+worker_process.kill()
+atexit.register(kill_worker_processes)
+
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
 
 Review comment:
   Endpoint URLs are not known at container startup. That's also why artifact 
staging has to happen lazily. I will document this. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 298851)
Time Spent: 3h 10m  (was: 3h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298842
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 16:40
Start Date: 21/Aug/19 16:40
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r316286663
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Worker pool entry point.
+
+The worker pool exposes an RPC service that is used with EXTERNAL
+environment to start and stop the SDK workers.
+
+This entry point is used by the Python SDK container in worker pool mode.
+"""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+
+# Register to kill the subprocesses on exit.
+def kill_worker_processes():
+  for worker_process in cls._worker_processes.values():
+worker_process.kill()
+atexit.register(kill_worker_processes)
+
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE,
+  close_fds=True)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298361=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298361
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 21/Aug/19 01:39
Start Date: 21/Aug/19 01:39
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315968878
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298300=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298300
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 23:10
Start Date: 20/Aug/19 23:10
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [WIP] 
[BEAM-7980] External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315940832
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298214=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298214
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 21:47
Start Date: 20/Aug/19 21:47
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [WIP] 
[BEAM-7980] External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315918276
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298213=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298213
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 21:46
Start Date: 20/Aug/19 21:46
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [WIP] 
[BEAM-7980] External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315918147
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297756=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297756
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315622434
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297764=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297764
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315619665
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297763=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297763
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315621663
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297765
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315620971
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -59,6 +60,18 @@ const (
 
 func main() {
flag.Parse()
+
+   if *workerPool == true {
+args := []string{
+"-m",
+"apache_beam.runners.worker.worker_pool_main",
+"--servicer_port=5",
+"--container_executable=/opt/apache/beam/boot",
+}
+log.Printf("Executing: python %v", strings.Join(args, " "))
+log.Fatalf("Python exited: %v", execx.Execute("python", args...))
 
 Review comment:
   ```suggestion
   log.Fatalf("Python SDK worker pool exited: %v", 
execx.Execute("python", args...))
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297765)
Time Spent: 2h  (was: 1h 50m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297759=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297759
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315616892
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297760=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297760
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315621982
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297766
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315623518
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297754=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297754
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315618046
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -40,6 +40,7 @@ var (
 
// Contract: https://s.apache.org/beam-fn-api-container-contract.
 
+workerPool= flag.Bool("worker_pool", false, "Run as worker pool 
(optional).")
id= flag.String("id", "", "Local identifier 
(required).")
 
 Review comment:
   tab/space alignment
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297754)
Time Spent: 40m  (was: 0.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297755=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297755
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315620091
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
 
 Review comment:
   Since there is no documentation of this, perhaps expand the comment here?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297755)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297762
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315621096
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -59,6 +60,18 @@ const (
 
 func main() {
flag.Parse()
+
+   if *workerPool == true {
+args := []string{
+"-m",
+"apache_beam.runners.worker.worker_pool_main",
+"--servicer_port=5",
+"--container_executable=/opt/apache/beam/boot",
+}
+log.Printf("Executing: python %v", strings.Join(args, " "))
+log.Fatalf("Python exited: %v", execx.Execute("python", args...))
 
 Review comment:
   also indention is off here
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297762)
Time Spent: 1h 40m  (was: 1.5h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297753
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315614665
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -804,7 +804,7 @@ service BeamFnLogging {
   stream LogControl) {}
 }
 
-message NotifyRunnerAvailableRequest {
+message StartWorkerRequest {
 
 Review comment:
   +1 for this name. Since this feature hasn't been expose, renaming should be 
ok.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297753)
Time Spent: 0.5h  (was: 20m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297757=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297757
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315619517
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297758=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297758
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315622299
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py
 ##
 @@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Worker pool entry point."""
+
+from __future__ import absolute_import
+
+import argparse
+import atexit
+import logging
+import subprocess
+import sys
+import threading
+import time
+from concurrent import futures
+
+import grpc
+
+from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
+from apache_beam.runners.worker import sdk_worker
+
+
+class BeamFnExternalWorkerPoolServicer(
+beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
+
+  def __init__(self, worker_threads, use_process=False,
+   container_executable=None):
+self._worker_threads = worker_threads
+self._use_process = use_process
+self._container_executable = container_executable
+self._worker_processes = {}
+
+  @classmethod
+  def start(cls, worker_threads=1, use_process=False, port=0,
+container_executable=None):
+worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+worker_address = 'localhost:%s' % worker_server.add_insecure_port(
+'[::]:%s' % port)
+beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
+cls(worker_threads,
+use_process=use_process,
+container_executable=container_executable),
+worker_server)
+worker_server.start()
+return worker_address, worker_server
+
+  def StartWorker(self, start_worker_request, unused_context):
+try:
+  if self._use_process:
+command = ['python', '-c',
+   'from apache_beam.runners.worker.sdk_worker '
+   'import SdkHarness; '
+   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+   start_worker_request.control_endpoint.url,
+   self._worker_threads,
+   start_worker_request.worker_id)]
+if self._container_executable:
+  # command as per container spec
+  command = [self._container_executable,
+ '--id=%s' % start_worker_request.worker_id,
+ '--logging_endpoint=%s'
+ % start_worker_request.logging_endpoint.url,
+ '--artifact_endpoint=%s'
+ % start_worker_request.artifact_endpoint.url,
+ '--provision_endpoint=%s'
+ % start_worker_request.provision_endpoint.url,
+ '--control_endpoint=%s'
+ % start_worker_request.control_endpoint.url,
+]
+
+logging.warn("Starting worker with command %s" % command)
+worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+self._worker_processes[start_worker_request.worker_id] = worker_process
+
+# Register to kill the subprocess on exit.
+atexit.register(worker_process.kill)
+  else:
+worker = sdk_worker.SdkHarness(
+start_worker_request.control_endpoint.url,
+worker_count=self._worker_threads,
+worker_id=start_worker_request.worker_id)
+worker_thread = threading.Thread(
+name='run_worker_%s' % start_worker_request.worker_id,
+target=worker.run)
+worker_thread.daemon = True
+worker_thread.start()
+
+  return beam_fn_api_pb2.StartWorkerResponse()
+except Exception as exn:
+  return beam_fn_api_pb2.StartWorkerResponse(error=str(exn))
+
+  def StopWorker(self, 

[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297767=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297767
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 20/Aug/19 10:49
Start Date: 20/Aug/19 10:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#discussion_r315621360
 
 

 ##
 File path: sdks/python/container/boot.go
 ##
 @@ -59,6 +60,18 @@ const (
 
 func main() {
flag.Parse()
+
+   if *workerPool == true {
+args := []string{
+"-m",
+"apache_beam.runners.worker.worker_pool_main",
+"--servicer_port=5",
+"--container_executable=/opt/apache/beam/boot",
+}
+log.Printf("Executing: python %v", strings.Join(args, " "))
 
 Review comment:
   ```suggestion
   log.Printf("Starting Python SDK worker pool: python %v", 
strings.Join(args, " "))
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297767)
Time Spent: 2h 10m  (was: 2h)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297348=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297348
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 19/Aug/19 18:26
Start Date: 19/Aug/19 18:26
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9371: [WIP] [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371#issuecomment-522698334
 
 
   CC: @rakeshcusat 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 297348)
Time Spent: 20m  (was: 10m)

> External environment with containerized worker pool
> ---
>
> Key: BEAM-7980
> URL: https://issues.apache.org/jira/browse/BEAM-7980
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Augment Beam Python docker image and boot.go so that it can be used to launch 
> BeamFnExternalWorkerPoolServicer.
> [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0]
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool

2019-08-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=296953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-296953
 ]

ASF GitHub Bot logged work on BEAM-7980:


Author: ASF GitHub Bot
Created on: 18/Aug/19 19:48
Start Date: 18/Aug/19 19:48
Worklog Time Spent: 10m 
  Work Description: tweise commented on pull request #9371: [BEAM-7980] 
External environment with containerized worker pool
URL: https://github.com/apache/beam/pull/9371
 
 
   Add option to use the Python SDK Docker image for execution as worker pool, 
launching individual workers through boot.go as per container spec.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build