I realized that my last comment doesn't make sense because, as you pointed out earlier, LOOPBACK loops back to the submitting job process to execute the job.
On Mon, Mar 27, 2023 at 9:34 AM Sherif Tolba <sherif.tol...@gmail.com> wrote: > Oh, I might have missed the point about how the Flink portable runner > works. Is it the case that when I specify LOOPBACK, that the binary sent > already includes the SDK and is able to spawn the Go routines on the > TaskManager without any additional config? > > On Fri, Mar 24, 2023 at 6:02 PM Robert Burke <lostl...@apache.org> wrote: > >> That's fine. While it adds a kubernetes dependency to the Beam go.mod, it >> won't actually be used by clients unless we're adding it to to one of the >> main packages. That's fairly safe. >> >> I'd recommend putting an appropriately named package under into the "cmd" >> directory, since it would be most useful as a stand alone command for >> someone to run. That can be hashed out appropriately during code review, >> once it's working for your usecase. >> >> On 2023/03/24 20:05:06 Sherif Tolba wrote: >> > Hi Robert, >> > >> > Thank you for your insightful response. The setup you described makes >> sense >> > to me and I'd like to give the grpc server implementation a try. >> Referring >> > to your point below: >> > >> > "This `workerService` would then use whatever it likes to get containers >> > onto VMs when requested by the Job service/flink." >> > >> > , it will be Kubernetes-specific for my use case, is this okay? >> > >> > Thanks, >> > Sherif >> > >> > On Thu, Mar 23, 2023 at 3:52 PM Robert Burke <lostl...@apache.org> >> wrote: >> > >> > > Oh that's very interesting! I have a few comments, but we could end up >> > > with a new feature for the Go SDK. >> > > >> > > As you've noted, you shouldn't really be manually spinning up step 4. >> It's >> > > up to the runner to do that, but it does look like for your usage, >> some >> > > assistance is needed. >> > > >> > > The Python Boot Loader has that in order to support "sibling >> processes", >> > > on a single VM container. Basically, it's a hack to get around the >> Global >> > > Interperter Lock slowing things down, and multiprocessing. Starting >> > > additional separate processes allows for efficient use of cores and >> > > multiprocessing. Go and Java don't need this since they have robust >> > > concurrency support, and will generally process each bundle sent to >> them in >> > > parallel. >> > > >> > > The bootloaders don't currently share a lot of code, since they were >> > > developed with the language harness they start up in mind. >> > > >> > > So the worker_pool flag you mention is here: >> > > >> > > >> https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/container/boot.go#L55 >> > > >> > > The "External" stuff you see, is also how LOOPBACK mode operates. It >> just >> > > treats all desired workers as internal processes. But, it's largely >> just a >> > > service spec that anyone could implement and ensure it's pointed to. >> > > >> > > Eg. For Loopback in the Go SDK, the implementation is entirely here: >> > > >> > > >> https://github.com/apache/beam/blob/ba3dcd1cb983bbe92531ab7deae95438e93a1d4a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go#L33 >> > > >> > > Python's equivalent is here: >> > > >> https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py >> > > >> > > The service definition is pretty straight forward here, just >> StartWorker >> > > and StopWorker requests. >> > > >> > > >> > > >> https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1104 >> > > >> > > Basically, for distributed workers, you'd just need create a GRPC >> server >> > > for that API to spin up the worker with the right flags so it can >> connect >> > > and process the job. Note that this is "environment specific" so this >> would >> > > only start "Go SDK" workers (just as you've seen the Python only >> starting >> > > up Python SDK workers".) >> > > >> > > So if you're good with cluster managers, Kubernetes can be used for >> this >> > > for example instead of whatever Flink is managing. >> > > >> > > The way I'm currently picturing it is a separate service binary >> (mostly to >> > > avoid unnecessary built in deps...). If it's in the Go SDK Module, it >> > > should default to the Go SDK container, but there's no reason not to >> > > provide an override if desired. >> > > >> > > Default image for a given release is at : >> > > >> https://github.com/apache/beam/blob/011296c14659f80c8ecbeefda79ecc3f1113bd95/sdks/go/pkg/beam/core/core.go#L33 >> > > (the dev versions aren't pushed by default, but this will work after >> > > release). >> > > >> > > Then it's up to the runner to talk to that service to start and stop >> > > workers as needed. >> > > >> > > $ workerService <external_worker_service_addr> --container=<container> >> > > ...other config... >> > > >> > > $ myPipelineBinary --runner=portable --environment_type=external >> > > --environment_config=<external_worker_service_addr> --endpoint=<beam >> job >> > > service> >> > > >> > > This `workerService` would then use whatever it likes to get >> containers >> > > onto VMs when requested by the Job service/flink. >> > > >> > > I'd be entirely delighted for such a thing to be contributed, and help >> > > review it. @lostluck on github, if you desired to go this path. >> > > >> > > Robert Burke >> > > Beam Go Busybody >> > > >> > > On 2023/03/23 15:09:44 Sherif Tolba wrote: >> > > > Thank you, Robert, for your detailed response and the resources you >> > > shared. >> > > > >> > > > One thing that I didn't mention is that my goal is to move the >> setup to >> > > EKS >> > > > after completing local experimentation. As you pointed out LOOPBACK >> is >> > > > mainly for local setups and testing. I also started with the DOCKER >> mode, >> > > > however, Flink's Job Manager threw an error: >> > > > >> > > > java.io.IOException: Cannot run program "docker": error=2, No such >> file >> > > or >> > > > directory >> > > > >> > > > despite using the unaltered official image: >> > > > >> > > > docker run --network=host --mount >> > > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 >> jobmanager >> > > > >> > > > I tried to build a custom image to install docker and make sure >> flink >> > > user >> > > > has the write permissions to call it, without success. >> > > > >> > > > Additionally, since I'd like move it eventually to the cluster, it >> made >> > > > more sense to me to try to use the EXTERNAL mode and have the >> workers >> > > spun >> > > > up as a separate Kubernetes deployment, then link to the associated >> K8s >> > > > service using the envConfig := >> k8s-go-sdk-harness-svc-name:<port_number> >> > > > pipeline option.I saw something similar done in this article >> > > > <https://ndeepak.com/posts/2022-07-07-local-beam/>, however, >> Deepak is >> > > > using a Python pipeline and it is more straightforward to start a >> Python >> > > > SDK Harness using the -workerpool flag. I was able to create a >> Python SDK >> > > > Harness similar to what he did but, as expected, when submitting >> the Go >> > > > pipeline, it failed because the environment URN refers to Python >> and not >> > > Go. >> > > > >> > > > Below are more details about what I am doing: >> > > > >> > > > 1) Run Flink's Job Manager in one terminal using: >> > > > >> > > > docker run --network=host --mount >> > > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 >> jobmanager >> > > > >> > > > 2) Run Flink's Task Manager in another tab using: >> > > > >> > > > docker run --network=host --mount >> > > > >> > > >> type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging >> > > > flink:1.14.0 taskmanager >> > > > >> > > > 3) Run Beam's Job Server in a third tab: >> > > > >> > > > docker run --net=host --mount >> > > > type=bind,source=/tmp/staged,target=/tmp/staged >> > > > apache/beam_flink1.14_job_server:latest >> --flink-master=localhost:8081 >> > > > >> > > > 4) Try to run Go SDK Harness in a fourth tab (fauliure): >> > > > >> > > > docker run --network=host --mount >> > > > >> > > >> type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging >> > > > apache/beam_go_sdk:2.46.0 --id=1-1 >> --logging_endpoint=localhost:44977 >> > > > --artifact_endpoint=localhost:43219 >> --provision_endpoint=localhost:34437 >> > > > --control_endpoint=localhost:42935 >> > > > >> > > > 5) Compile the Go pipeline and run it in a fifth tab: >> > > > >> > > > go build minimal_wordcount.go >> > > > ./minimal_wordcount >> > > > >> > > > *Code* >> > > > >> > > > func main() { >> > > > >> > > > setJobOptions() >> > > > >> > > > beam.Init() >> > > > p := beam.NewPipeline() >> > > > s := p.Root() >> > > > >> > > > lines := textio.Read(s, >> > > "gs://apache-beam-samples/shakespeare/kinglear.txt") >> > > > words := beam.ParDo(s, func(line string, emit func(string)) { >> > > > for _, word := range wordRE.FindAllString(line, -1) { >> > > > emit(word) >> > > > } >> > > > }, lines) >> > > > >> > > > counted := stats.Count(s, words) >> > > > formatted := beam.ParDo(s, func(w string, c int) string { >> > > > return fmt.Sprintf("%s: %v", w, c) >> > > > }, counted) >> > > > >> > > > textio.Write(s, "wordcounts.txt", formatted) >> > > > flink.Execute(context.Background(), p) >> > > > } >> > > > >> > > > func setJobOptions() { >> > > > endPoint := "localhost:8099" >> > > > envType := "EXTERNAL" >> > > > envConfig := "localhost:34437" >> > > > jobName := "test_word_count" >> > > > isAsync := true >> > > > parallelism := 1 >> > > > >> > > > jobopts.JobName = &jobName >> > > > jobopts.Endpoint = &endPoint >> > > > jobopts.Async = &isAsync >> > > > jobopts.Parallelism = ¶llelism >> > > > jobopts.EnvironmentType = &envType >> > > > jobopts.EnvironmentConfig = &envConfig >> > > > } >> > > > >> > > > I am still reading up the container contract you linked but not >> sure if >> > > > starting the harness manually is a good idea in the first place >> based on >> > > > what you mentioned at the beginning of your response. >> > > > >> > > > Thank you, >> > > > Sherif >> > > > >> > > > >> > > > >> > > > On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <lostl...@apache.org> >> > > wrote: >> > > > >> > > > > I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK >> it's >> > > up >> > > > > to the runner to orchestrate/set up properly configured workers >> with >> > > the >> > > > > containers. With Beam, there should never be any need to >> manually set >> > > up >> > > > > workers for Flink, etc to run on. >> > > > > >> > > > > Those flags/etc are part of the "beam container contract", and are >> > > > > internal implementation details, that (ideally), an end pipeline >> author >> > > > > doesn't need to worry about. The original design doc is here: >> > > > > https://s.apache.org/beam-fn-api-container-contract, but it's >> rather >> > > out >> > > > > of date WRT the fine details (eg. Modern SDKs use the single >> > > > > ProvisioningService to get the other service URLs, rather than >> them all >> > > > > being provided by flags.) >> > > > > >> > > > > The official instructions are here: >> > > > > >> > > > > https://beam.apache.org/documentation/runners/flink/ >> > > > > >> > > > > In particular, there are two modes to be aware of for local runs: >> > > > > >> > > > > 1st. LOOPBACK mode, which will have Flink "loop back" to the >> submitting >> > > > > job process to execute the job. >> > > > > >> > > > > Start the Flink Beam Job service: (eg. for flink1.10) >> > > > > >> > > > > docker run --net=host apache/beam_flink1.10_job_server:latest >> > > > > >> > > > > Submitting your job to the Beam Job Server (eg. at >> localhost:8099), >> > > with >> > > > > the LOOPBACK environment type. >> > > > > --runner=PortableRunner >> > > > > --endpoint=localhost:8099 >> > > > > --environment_type=LOOPBACK >> > > > > >> > > > > (Note, the doc there is very python SDK focused, so the >> --job_endpoint >> > > > > flag is just --endpoint in the Go SDK). >> > > > > >> > > > > Other than executing in the main process, this is still using >> portable >> > > > > beam. >> > > > > >> > > > > 2nd Container mode: This is closer to what you're trying to do. >> > > > > >> > > > > Per the linked doc, this requires you to start the Flink cluser >> with >> > > it's >> > > > > rest port (eg. localhost:8081), then with Docker, starting the >> > > connected >> > > > > Beam Job service: (eg. for flink1.10) >> > > > > >> > > > > docker run --net=host apache/beam_flink1.10_job_server:latest >> > > > > --flink-master=localhost:8081 >> > > > > >> > > > > Note the "flink-master" flag, is how Beam ultimate sends jobs to >> flink, >> > > > > and then sets up the workers. >> > > > > >> > > > > Then submit your job to *that* endpoint (which should remain at >> > > > > localhost:8081) this largely should largely be the same, but >> without >> > > > > setting the "environment_type" flag. >> > > > > >> > > > > ------- >> > > > > >> > > > > Finally, I'd be remiss not to try to point you to the in >> development >> > > > > "Prism" runner, which will eventually replace the current Go >> Direct >> > > runner. >> > > > > >> > > > > See >> > > > > >> > > >> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism >> > > > > for current usage instructions and restrictions. >> > > > > >> > > > > It's currently suitable for smoke testing small pipelines, but the >> > > goal is >> > > > > to have a portable reference runner, WRT all facets of beam. >> Depending >> > > on >> > > > > what Beam features you're using, it may not be suitable. >> > > > > >> > > > > I hope this helps! >> > > > > Robert Burke >> > > > > Beam Go Busybody >> > > > > >> > > > > (note, I don't monitor this list, but Beam Go SDK questions tend >> to >> > > find >> > > > > their way to me) >> > > > > >> > > > > On 2023/03/22 11:51:53 Sherif Tolba wrote: >> > > > > > Hi Apache Beam team, >> > > > > > >> > > > > > I hope this email finds you all well. I have been experimenting >> with >> > > > > Apache >> > > > > > Beam and Flink, mainly using golang. I hit a roadblock when >> trying >> > > to run >> > > > > > the minimal word count example on Beam and Flink locally using >> Go SDK >> > > > > > workers. I am trying to use the "apache/beam_go_sdk:2.46.0" >> Docker >> > > image >> > > > > as >> > > > > > follows: >> > > > > > >> > > > > > docker run --network=host apache/beam_go_sdk:2.46.0 >> > > > > > --id=1-1 --provision_endpoint=localhost:50000 <-- (I set this >> port >> > > > > based >> > > > > > on some research online, but I don't really know what the >> service >> > > should >> > > > > be) >> > > > > > >> > > > > > However, I am unable to understand what the following options >> > > represent: >> > > > > > >> > > > > > Usage of /opt/apache/beam/boot: >> > > > > > -artifact_endpoint string >> > > > > > Local artifact endpoint for FnHarness (required). >> > > > > > -control_endpoint string >> > > > > > Local control endpoint for FnHarness (required). >> > > > > > -id string >> > > > > > Local identifier (required). >> > > > > > -logging_endpoint string >> > > > > > Local logging endpoint for FnHarness (required). >> > > > > > -provision_endpoint string >> > > > > > Local provision endpoint for FnHarness (required). >> > > > > > -semi_persist_dir string >> > > > > > Local semi-persistent directory (optional). (default >> "/tmp") >> > > > > > >> > > > > > I checked: >> > > > > > >> https://github.com/apache/beam/blob/master/sdks/go/container/boot.go >> > > but >> > > > > > still unable to tell what these endpoints are. I couldn't find >> any >> > > online >> > > > > > documentation describing, for example, what the >> provision_endpoint >> > > should >> > > > > > be set to. >> > > > > > >> > > > > > I would greatly appreciate any pointers or explanation. >> > > > > > >> > > > > > My setup is as follows: I have a Flink JobManager, two >> TaskManagers, >> > > and >> > > > > a >> > > > > > Beam JobServer running locally. I can execute the pipeline >> that's >> > > written >> > > > > > in Go and see the job submitted on Flink's UI, however, it >> quickly >> > > fails >> > > > > > because there are no workers to execute the Go transforms. >> > > > > > >> > > > > > Thanks, >> > > > > > Sherif Tolba >> > > > > > >> > > > > >> > > > >> > > >> > >> >