[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=308392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308392 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 07/Sep/19 21:51 Start Date: 07/Sep/19 21:51 Worklog Time Spent: 10m Work Description: lostluck commented on issue #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#issuecomment-529149693 LGTM after the fact. Good use of the first class function for the lock wrapping. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 308392) Time Spent: 9h (was: 8h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Fix For: 2.16.0 > > Time Spent: 9h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305727=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305727 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 03/Sep/19 17:44 Start Date: 03/Sep/19 17:44 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305727) Time Spent: 8h 50m (was: 8h 40m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305666 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 03/Sep/19 15:58 Start Date: 03/Sep/19 15:58 Worklog Time Spent: 10m Work Description: aaltay commented on issue #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#issuecomment-527523050 New changes LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305666) Time Spent: 8h 40m (was: 8.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305487 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 03/Sep/19 09:22 Start Date: 03/Sep/19 09:22 Worklog Time Spent: 10m Work Description: mxm commented on issue #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#issuecomment-527378537 I think this can be squashed and merged. @lostluck's comments seem to be resolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305487) Time Spent: 8.5h (was: 8h 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=305394=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-305394 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 03/Sep/19 04:10 Start Date: 03/Sep/19 04:10 Worklog Time Spent: 10m Work Description: tweise commented on issue #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#issuecomment-527296613 @lostluck @aaltay @mxm PTAL since I made a few more changes than those requested to improve the synchronization logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 305394) Time Spent: 8h 20m (was: 8h 10m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 8h 20m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=304999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304999 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 01/Sep/19 17:43 Start Date: 01/Sep/19 17:43 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r319768879 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + dir := filepath.Join(*semiPersistDir, "staged") + + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) + if err != nil { + log.Fatalf("Failed to retrieve staged files: %v", err) + } + + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + if setupErr := installSetupPackages(files, dir); setupErr != nil { + log.Fatalf("Failed to install required packages: %v", setupErr) + } + + // mark install complete + os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666) + +}() // (3) Invoke python Review comment: They don't have to be same on Flink, but the staged artifacts have to be identical. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304999) Time Spent: 8h 10m (was: 8h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=304988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-304988 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 01/Sep/19 17:03 Start Date: 01/Sep/19 17:03 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r319767728 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") Review comment: Done! Can this be accomplished through the gradle plugin also (w/o extra go install)? Would be nice to have this integrated like Spotless for Java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 304988) Time Spent: 8h (was: 7h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=301119=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-301119 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 26/Aug/19 09:26 Start Date: 26/Aug/19 09:26 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r317522264 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: Is this check really necessary? In case of a race condition when multiple workers complete this section before the install file has been created, only the later check will catch that. For simplicity, I'd remove this check entirely, unless the locking is very expensive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 301119) Time Spent: 7h 50m (was: 7h 40m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=301120=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-301120 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 26/Aug/19 09:26 Start Date: 26/Aug/19 09:26 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r317522603 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: Makes sense. I'm new to Go. However, if you look at the above similar check (line 115), there is a possible race condition which can only be caught here where the lock is in place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 301120) Time Spent: 7h 50m (was: 7h 40m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300511 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 23/Aug/19 21:07 Start Date: 23/Aug/19 21:07 Worklog Time Spent: 10m Work Description: lostluck commented on issue #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#issuecomment-524462613 FYI I'm gone until Sept 3rd, please don't block on unnecessarily on further feed back from me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300511) Time Spent: 7h 40m (was: 7.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300500=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300500 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 23/Aug/19 20:46 Start Date: 23/Aug/19 20:46 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r317295950 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: There is no need to enter the locked section when the install was already completed (hence the comment above). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300500) Time Spent: 7.5h (was: 7h 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300487 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 23/Aug/19 20:22 Start Date: 23/Aug/19 20:22 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r31726 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: This is within the lock section. defer ... above executes after the completion of this function. I think we should block all workers as intended here, because workers will need the environment to be setup first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300487) Time Spent: 7h 20m (was: 7h 10m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300154=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300154 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 23/Aug/19 08:23 Start Date: 23/Aug/19 08:23 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r317022783 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: Do you want to avoid blocking the workers? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300154) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300153=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300153 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 23/Aug/19 08:23 Start Date: 23/Aug/19 08:23 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r317022471 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: Why is this not within the locked section? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300153) Time Spent: 7h 10m (was: 7h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=300152=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300152 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 23/Aug/19 08:23 Start Date: 23/Aug/19 08:23 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r317022289 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return Review comment: I think this should live within the locked section. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300152) Time Spent: 7h 10m (was: 7h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299639=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299639 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:37 Start Date: 22/Aug/19 18:37 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316827877 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + dir := filepath.Join(*semiPersistDir, "staged") + + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) + if err != nil { + log.Fatalf("Failed to retrieve staged files: %v", err) + } + + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + if setupErr := installSetupPackages(files, dir); setupErr != nil { + log.Fatalf("Failed to install required packages: %v", setupErr) + } + + // mark install complete + os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666) + +}() // (3) Invoke python Review comment: Is that to mean that the loggingEndpoint and controlEndpoints are the same? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299639) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299638 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:37 Start Date: 22/Aug/19 18:37 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316825518 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: Review comment: Nit: This is fine as is, but here's an optional suggestion. In this instance the lockfile package author has a type TemporaryError to indicate the error can be retried,which we can check for instead of listing all temporary errors. if _, ok := err.(lockfile.TemporaryError); ok { //sleep etc } else { log.Fatalf(... } Or if you're keen on a switch, how about a type switch? switch err.(type) { case lockfile.TemporaryError: //sleep etc default: log.Fatalf(... } This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299638) Time Spent: 7h (was: 6h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299625=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299625 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:20 Start Date: 22/Aug/19 18:20 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316818691 ## File path: sdks/python/container/boot.go ## @@ -33,6 +34,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" "github.com/golang/protobuf/proto" + "github.com/nightlyone/lockfile" Review comment: About the same way as any other package: looking at the code. There's no good third party vetting system for Go at the moment. As for this one, it's a small package, (~200 lines), and only imports from the standard library, making it relatively safe, and easy to inspect. https://github.com/nightlyone/lockfile/blob/master/lockfile.go You can also get a view of other open source imported from godoc.org https://godoc.org/github.com/nightlyone/lockfile?importers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299625) Time Spent: 6h 50m (was: 6h 40m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299624=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299624 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:18 Start Date: 22/Aug/19 18:18 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316820565 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") Review comment: The indentation here is wonky (it's using spaces instead of tabs). Please run go fmt on this file. https://blog.golang.org/go-fmt-your-code This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299624) Time Spent: 6h 40m (was: 6.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299622=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299622 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:17 Start Date: 22/Aug/19 18:17 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316819996 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) Review comment: If it's only used in one place, this is fine as is. It would not be wrong to move the path definition elsewhere, but it would be indirect. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299622) Time Spent: 6.5h (was: 6h 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299621=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299621 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:14 Start Date: 22/Aug/19 18:14 Worklog Time Spent: 10m Work Description: lostluck commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316818691 ## File path: sdks/python/container/boot.go ## @@ -33,6 +34,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" "github.com/golang/protobuf/proto" + "github.com/nightlyone/lockfile" Review comment: About the same way as any other package: looking at the code. There's no good third party vetting system for Go at the moment. As for this one, it's a small package, (~200 lines), and only imports from the standard library, making it relatively safe, and easy to inspect. https://github.com/nightlyone/lockfile/blob/master/lockfile.go This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299621) Time Spent: 6h 20m (was: 6h 10m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299615 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 18:02 Start Date: 22/Aug/19 18:02 Worklog Time Spent: 10m Work Description: tweise commented on issue #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#issuecomment-524013766 @aaltay thanks for taking a look, is there any other Go expert that can be tagged here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299615) Time Spent: 6h 10m (was: 6h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299572=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299572 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 16:59 Start Date: 22/Aug/19 16:59 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316781201 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + dir := filepath.Join(*semiPersistDir, "staged") + + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) + if err != nil { + log.Fatalf("Failed to retrieve staged files: %v", err) + } + + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + if setupErr := installSetupPackages(files, dir); setupErr != nil { Review comment: Could we actually separate the artifact/install logic L144-L155 into a function from the locking related logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299572) Time Spent: 5h 50m (was: 5h 40m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299573=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299573 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 16:59 Start Date: 22/Aug/19 16:59 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316786467 ## File path: sdks/python/container/boot.go ## @@ -33,6 +34,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/util/execx" "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" "github.com/golang/protobuf/proto" + "github.com/nightlyone/lockfile" Review comment: For my learning, how does one pick/vet right go packages? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299573) Time Spent: 6h (was: 5h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299574=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299574 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 16:59 Start Date: 22/Aug/19 16:59 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316782950 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) Review comment: Should we separate `filepath.Join(os.TempDir(), "beam.install.lck")` to a variable similar to `installCompleteFile`? Not sure what is the go convention if it is being used only in one place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299574) Time Spent: 6h (was: 5h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299575=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299575 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 16:59 Start Date: 22/Aug/19 16:59 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316783735 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + dir := filepath.Join(*semiPersistDir, "staged") + + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) + if err != nil { + log.Fatalf("Failed to retrieve staged files: %v", err) + } + + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + if setupErr := installSetupPackages(files, dir); setupErr != nil { + log.Fatalf("Failed to install required packages: %v", setupErr) + } + + // mark install complete + os.OpenFile(installCompleteFile, os.O_RDONLY|os.O_CREATE, 0666) + +}() // (3) Invoke python Review comment: Related to env variables below. We need to make sure that no worker receives different endpoint. (This is a non-issue today.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299575) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299132 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 03:28 Start Date: 22/Aug/19 03:28 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316483610 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + dir := filepath.Join(*semiPersistDir, "staged") + + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) + if err != nil { + log.Fatalf("Failed to retrieve staged files: %v", err) + } + + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + if setupErr := installSetupPackages(files, dir); setupErr != nil { Review comment: In the future we could look into inverting this logic by extracting the exactly-once logic into a separate function that can be shared between the SDK containers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299132) Time Spent: 5.5h (was: 5h 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299134=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299134 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 03:28 Start Date: 22/Aug/19 03:28 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398#discussion_r316483610 ## File path: sdks/python/container/boot.go ## @@ -105,18 +107,57 @@ func main() { // (2) Retrieve and install the staged packages. - dir := filepath.Join(*semiPersistDir, "staged") + func() { - files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) - if err != nil { - log.Fatalf("Failed to retrieve staged files: %v", err) - } +installCompleteFile := filepath.Join(os.TempDir(), "beam.install.complete") - // TODO(herohde): the packages to install should be specified explicitly. It - // would also be possible to install the SDK in the Dockerfile. - if setupErr := installSetupPackages(files, dir); setupErr != nil { - log.Fatalf("Failed to install required packages: %v", setupErr) - } + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + // lock to guard from concurrent artifact retrieval and installation, + // when called by child processes in a worker pool + lock, err := lockfile.New(filepath.Join(os.TempDir(), "beam.install.lck")) + if err != nil { + log.Fatalf("Cannot init artifact retrieval lock: %v", err) + } + + for err = lock.TryLock(); err != nil; err = lock.TryLock() { + switch err { + case lockfile.ErrBusy, lockfile.ErrNotExist: + time.Sleep(5 * time.Second) + log.Printf("Worker %v waiting for artifact retrieval lock: %v", *id, lock) + default: + log.Fatalf("Worker %v could not obtain artifact retrieval lock: %v", *id, err) + } + } + defer lock.Unlock() + + // skip if install already complete + _, err = os.Stat(installCompleteFile) + if err == nil { + return + } + + dir := filepath.Join(*semiPersistDir, "staged") + + files, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetRetrievalToken(), dir) + if err != nil { + log.Fatalf("Failed to retrieve staged files: %v", err) + } + + // TODO(herohde): the packages to install should be specified explicitly. It + // would also be possible to install the SDK in the Dockerfile. + if setupErr := installSetupPackages(files, dir); setupErr != nil { Review comment: In the future we could look into inverting this logic by extracting the exactly-once part into a separate function that can be shared between the SDK containers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299134) Time Spent: 5h 40m (was: 5.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299131=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299131 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 22/Aug/19 03:26 Start Date: 22/Aug/19 03:26 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9398: [BEAM-7980] Exactly once artifact retrieval for Python SDK worker pool URL: https://github.com/apache/beam/pull/9398 This is the follow-up for exactly once artifact retrieval. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299048=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299048 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 22:31 Start Date: 21/Aug/19 22:31 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299048) Time Spent: 5h 10m (was: 5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299036=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299036 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 21:46 Start Date: 21/Aug/19 21:46 Worklog Time Spent: 10m Work Description: tweise commented on issue #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#issuecomment-523662128 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299036) Time Spent: 5h (was: 4h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299034=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299034 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 21:40 Start Date: 21/Aug/19 21:40 Worklog Time Spent: 10m Work Description: tweise commented on issue #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#issuecomment-523660441 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299034) Time Spent: 4h 50m (was: 4h 40m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299032=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299032 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 21:32 Start Date: 21/Aug/19 21:32 Worklog Time Spent: 10m Work Description: tweise commented on issue #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#issuecomment-523657737 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299032) Time Spent: 4h 40m (was: 4.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=299002=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299002 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 20:50 Start Date: 21/Aug/19 20:50 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316396047 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Worker pool entry point. + +The worker pool exposes an RPC service that is used with EXTERNAL +environment to start and stop the SDK workers. + +The worker pool uses child processes for parallelism; threads are Review comment: The documentation applies to the entry point, which always uses processes. It's OK if tests use the inner class separately, it should have its own documentation (I did not modify that part, just moved it). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 299002) Time Spent: 4.5h (was: 4h 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298982=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298982 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 20:22 Start Date: 21/Aug/19 20:22 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316384687 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,177 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Worker pool entry point. + +The worker pool exposes an RPC service that is used with EXTERNAL +environment to start and stop the SDK workers. + +The worker pool uses child processes for parallelism; threads are Review comment: It is a bit odd to read this here because `BeamFnExternalWorkerPoolServicer` supports threads internally and tests make use of this feature. If this module is the entry point for the worker pool, maybe the pooling code itself should be located in a different module and we should only handle its configuration here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298982) Time Spent: 4h 20m (was: 4h 10m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298973 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 20:12 Start Date: 21/Aug/19 20:12 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316380746 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Worker pool entry point. + +The worker pool exposes an RPC service that is used with EXTERNAL +environment to start and stop the SDK workers. + +This entry point is used by the Python SDK container in worker pool mode. +""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() + +# Register to kill the subprocesses on exit. +def kill_worker_processes(): + for worker_process in cls._worker_processes.values(): +worker_process.kill() +atexit.register(kill_worker_processes) + +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, Review comment: Makes sense. Thank you for dcoumenting. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298973) Time Spent: 4h 10m (was: 4h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298974=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298974 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 20:12 Start Date: 21/Aug/19 20:12 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316380301 ## File path: sdks/python/container/boot.go ## @@ -59,6 +60,18 @@ const ( func main() { flag.Parse() + + if *workerPool == true { + args := []string{ + "-m", + "apache_beam.runners.worker.worker_pool_main", + "--service_port=5", + "--container_executable=/opt/apache/beam/boot", + } + log.Printf("Starting Python SDK worker pool: python %v", strings.Join(args, " ")) + log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) Review comment: Sounds good. Do you mind adding a TODO here for that follow up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298974) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298909 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 19:03 Start Date: 21/Aug/19 19:03 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316353394 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298900=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298900 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 19:00 Start Date: 21/Aug/19 19:00 Worklog Time Spent: 10m Work Description: tweise commented on issue #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#issuecomment-523604353 @mxm @aaltay PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298900) Time Spent: 3h 40m (was: 3.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298902=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298902 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 19:00 Start Date: 21/Aug/19 19:00 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316352545 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298899 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 19:00 Start Date: 21/Aug/19 19:00 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316351902 ## File path: sdks/python/container/boot.go ## @@ -59,6 +60,18 @@ const ( func main() { flag.Parse() + + if *workerPool == true { + args := []string{ + "-m", + "apache_beam.runners.worker.worker_pool_main", + "--service_port=5", + "--container_executable=/opt/apache/beam/boot", + } + log.Printf("Starting Python SDK worker pool: python %v", strings.Join(args, " ")) + log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) Review comment: I'm working on the locking that prevents multiple processes from downloading artifacts and running installation concurrently, since this needs to be done only once. I would prefer to defer this to a follow-up PR though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298899) Time Spent: 3.5h (was: 3h 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298861=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298861 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 17:03 Start Date: 21/Aug/19 17:03 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316296507 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Worker pool entry point. + +The worker pool exposes an RPC service that is used with EXTERNAL +environment to start and stop the SDK workers. + +This entry point is used by the Python SDK container in worker pool mode. +""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() + +# Register to kill the subprocesses on exit. +def kill_worker_processes(): + for worker_process in cls._worker_processes.values(): +worker_process.kill() +atexit.register(kill_worker_processes) + +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, Review comment: Example how the worker pool is started: `docker run --rm -p=5:5 tweise-docker-apache.bintray.io/beam/python:latest --worker_pool=true` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298861) Time Spent: 3h 20m (was: 3h 10m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement >
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298851 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 16:52 Start Date: 21/Aug/19 16:52 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316291805 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Worker pool entry point. + +The worker pool exposes an RPC service that is used with EXTERNAL +environment to start and stop the SDK workers. + +This entry point is used by the Python SDK container in worker pool mode. +""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() + +# Register to kill the subprocesses on exit. +def kill_worker_processes(): + for worker_process in cls._worker_processes.values(): +worker_process.kill() +atexit.register(kill_worker_processes) + +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, Review comment: Endpoint URLs are not known at container startup. That's also why artifact staging has to happen lazily. I will document this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298851) Time Spent: 3h 10m (was: 3h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement >
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298842 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 16:40 Start Date: 21/Aug/19 16:40 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r316286663 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Worker pool entry point. + +The worker pool exposes an RPC service that is used with EXTERNAL +environment to start and stop the SDK workers. + +This entry point is used by the Python SDK container in worker pool mode. +""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() + +# Register to kill the subprocesses on exit. +def kill_worker_processes(): + for worker_process in cls._worker_processes.values(): +worker_process.kill() +atexit.register(kill_worker_processes) + +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE, + close_fds=True) +self._worker_processes[start_worker_request.worker_id] = worker_process + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298361=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298361 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 21/Aug/19 01:39 Start Date: 21/Aug/19 01:39 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315968878 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298300=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298300 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 23:10 Start Date: 20/Aug/19 23:10 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315940832 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298214=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298214 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 21:47 Start Date: 20/Aug/19 21:47 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315918276 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=298213=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298213 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 21:46 Start Date: 20/Aug/19 21:46 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315918147 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297756=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297756 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315622434 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297764=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297764 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315619665 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297763=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297763 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315621663 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297765=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297765 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315620971 ## File path: sdks/python/container/boot.go ## @@ -59,6 +60,18 @@ const ( func main() { flag.Parse() + + if *workerPool == true { +args := []string{ +"-m", +"apache_beam.runners.worker.worker_pool_main", +"--servicer_port=5", +"--container_executable=/opt/apache/beam/boot", +} +log.Printf("Executing: python %v", strings.Join(args, " ")) +log.Fatalf("Python exited: %v", execx.Execute("python", args...)) Review comment: ```suggestion log.Fatalf("Python SDK worker pool exited: %v", execx.Execute("python", args...)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297765) Time Spent: 2h (was: 1h 50m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297759=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297759 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315616892 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297760=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297760 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315621982 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297766=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297766 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315623518 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297754=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297754 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315618046 ## File path: sdks/python/container/boot.go ## @@ -40,6 +40,7 @@ var ( // Contract: https://s.apache.org/beam-fn-api-container-contract. +workerPool= flag.Bool("worker_pool", false, "Run as worker pool (optional).") id= flag.String("id", "", "Local identifier (required).") Review comment: tab/space alignment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297754) Time Spent: 40m (was: 0.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297755=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297755 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315620091 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" Review comment: Since there is no documentation of this, perhaps expand the comment here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297755) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297762=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297762 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315621096 ## File path: sdks/python/container/boot.go ## @@ -59,6 +60,18 @@ const ( func main() { flag.Parse() + + if *workerPool == true { +args := []string{ +"-m", +"apache_beam.runners.worker.worker_pool_main", +"--servicer_port=5", +"--container_executable=/opt/apache/beam/boot", +} +log.Printf("Executing: python %v", strings.Join(args, " ")) +log.Fatalf("Python exited: %v", execx.Execute("python", args...)) Review comment: also indention is off here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297762) Time Spent: 1h 40m (was: 1.5h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297753=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297753 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315614665 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -804,7 +804,7 @@ service BeamFnLogging { stream LogControl) {} } -message NotifyRunnerAvailableRequest { +message StartWorkerRequest { Review comment: +1 for this name. Since this feature hasn't been expose, renaming should be ok. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297753) Time Spent: 0.5h (was: 20m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297757=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297757 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315619517 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297758=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297758 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315622299 ## File path: sdks/python/apache_beam/runners/worker/worker_pool_main.py ## @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Worker pool entry point.""" + +from __future__ import absolute_import + +import argparse +import atexit +import logging +import subprocess +import sys +import threading +import time +from concurrent import futures + +import grpc + +from apache_beam.portability.api import beam_fn_api_pb2 +from apache_beam.portability.api import beam_fn_api_pb2_grpc +from apache_beam.runners.worker import sdk_worker + + +class BeamFnExternalWorkerPoolServicer( +beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer): + + def __init__(self, worker_threads, use_process=False, + container_executable=None): +self._worker_threads = worker_threads +self._use_process = use_process +self._container_executable = container_executable +self._worker_processes = {} + + @classmethod + def start(cls, worker_threads=1, use_process=False, port=0, +container_executable=None): +worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) +worker_address = 'localhost:%s' % worker_server.add_insecure_port( +'[::]:%s' % port) +beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( +cls(worker_threads, +use_process=use_process, +container_executable=container_executable), +worker_server) +worker_server.start() +return worker_address, worker_server + + def StartWorker(self, start_worker_request, unused_context): +try: + if self._use_process: +command = ['python', '-c', + 'from apache_beam.runners.worker.sdk_worker ' + 'import SdkHarness; ' + 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % ( + start_worker_request.control_endpoint.url, + self._worker_threads, + start_worker_request.worker_id)] +if self._container_executable: + # command as per container spec + command = [self._container_executable, + '--id=%s' % start_worker_request.worker_id, + '--logging_endpoint=%s' + % start_worker_request.logging_endpoint.url, + '--artifact_endpoint=%s' + % start_worker_request.artifact_endpoint.url, + '--provision_endpoint=%s' + % start_worker_request.provision_endpoint.url, + '--control_endpoint=%s' + % start_worker_request.control_endpoint.url, +] + +logging.warn("Starting worker with command %s" % command) +worker_process = subprocess.Popen(command, stdout=subprocess.PIPE) +self._worker_processes[start_worker_request.worker_id] = worker_process + +# Register to kill the subprocess on exit. +atexit.register(worker_process.kill) + else: +worker = sdk_worker.SdkHarness( +start_worker_request.control_endpoint.url, +worker_count=self._worker_threads, +worker_id=start_worker_request.worker_id) +worker_thread = threading.Thread( +name='run_worker_%s' % start_worker_request.worker_id, +target=worker.run) +worker_thread.daemon = True +worker_thread.start() + + return beam_fn_api_pb2.StartWorkerResponse() +except Exception as exn: + return beam_fn_api_pb2.StartWorkerResponse(error=str(exn)) + + def StopWorker(self,
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297767=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297767 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 20/Aug/19 10:49 Start Date: 20/Aug/19 10:49 Worklog Time Spent: 10m Work Description: mxm commented on pull request #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#discussion_r315621360 ## File path: sdks/python/container/boot.go ## @@ -59,6 +60,18 @@ const ( func main() { flag.Parse() + + if *workerPool == true { +args := []string{ +"-m", +"apache_beam.runners.worker.worker_pool_main", +"--servicer_port=5", +"--container_executable=/opt/apache/beam/boot", +} +log.Printf("Executing: python %v", strings.Join(args, " ")) Review comment: ```suggestion log.Printf("Starting Python SDK worker pool: python %v", strings.Join(args, " ")) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297767) Time Spent: 2h 10m (was: 2h) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=297348=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297348 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 19/Aug/19 18:26 Start Date: 19/Aug/19 18:26 Worklog Time Spent: 10m Work Description: tweise commented on issue #9371: [WIP] [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371#issuecomment-522698334 CC: @rakeshcusat This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297348) Time Spent: 20m (was: 10m) > External environment with containerized worker pool > --- > > Key: BEAM-7980 > URL: https://issues.apache.org/jira/browse/BEAM-7980 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Augment Beam Python docker image and boot.go so that it can be used to launch > BeamFnExternalWorkerPoolServicer. > [https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.lnhm75dhvhi0] > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-7980) External environment with containerized worker pool
[ https://issues.apache.org/jira/browse/BEAM-7980?focusedWorklogId=296953=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-296953 ] ASF GitHub Bot logged work on BEAM-7980: Author: ASF GitHub Bot Created on: 18/Aug/19 19:48 Start Date: 18/Aug/19 19:48 Worklog Time Spent: 10m Work Description: tweise commented on pull request #9371: [BEAM-7980] External environment with containerized worker pool URL: https://github.com/apache/beam/pull/9371 Add option to use the Python SDK Docker image for execution as worker pool, launching individual workers through boot.go as per container spec. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build