On Sat, Aug 29, 2020 at 10:59 AM Eugene Kirpichov <[email protected]> wrote:
> > On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne <[email protected]> wrote: > >> Hi Eugene, >> >> Glad that helped you out and thanks for the PR tweaking it for GCP. >> >> To fetch the containers from GCR, I had to log into Docker inside the >> Flink nodes, specifically inside the taskmanager container, using something >> like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker >> login -u oauth2accesstoken —password $(gcloud auth print-access-token)” >> >> Ouch that seems painful. I find this “precaching” step pretty silly and >> have considered making the DockerEnvironmentFactory a little more >> intelligent about how it deals with timeouts (e.g. no activity). It doesn’t >> seem like it would be too difficult to also add first-class support for >> pulling images from protected repositories. Extending the DockerPayload >> protobuf to pass along the additional information and tweaking the >> DockerEnvironmentFactory? I’m not a java expert but that might be worth >> exploring if this continues to be problematic. >> > Yeah it makes sense to have some first-class support for Docker > credentials. It's kind of a no-brainer that it's necessary with custom > containers, many companies probably wouldn't want to push their custom > containers to a public repo. > I was thinking of embedding the credentials JSON file into the > taskmanager container through its Dockerfile, that's workable but also > pretty silly - having to rebuild this container just for the sake of > putting in the credentials. > DockerPayload might be the right place to put credentials, but I wonder if > there's a way to do something more secure, with k8s secrets. I'm not too > well-versed in credential management. > Using k8s secrets you could mount your credentials into the container and tweak the pull/run command <https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java#L77> to first login using a pattern like cat /tmp/password.txt | docker login --username foo --password-stdin. Maybe the DockerPayload protobuf could include the password as raw-text or an absolute filepath and switch the login command depending. I found the time it takes to pull can be dramatically improved if you store >> everything in memory >> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186> >> . >> >> In the end, I got my pipeline to start, create the uber jar (about 240MB >> in size), take a few minutes to transmit it to Flink >> >> You could explore spinning up the beam-flink-job-server >> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml> >> and using the PortableRunner. In theory that should reduce the amount of >> data you’re syncing to the cluster. It does require exposing at least two >> ingress points (8099 and 8098) so you can hit the job and artifact services >> respectively. >> > Right, good idea! Haven't tried spinning up the job server. Exposing the > job and artifact services seems pretty easy; but would also need to replace > the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a > custom-built one with the Beam 2.24 snapshot we're using. > This may be necessary anyways depending how you handle the docker login stuff. In any case good luck! > > >> Cheers, >> Sam >> >> On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <[email protected]> >> wrote: >> >>> Woohoo thanks Kyle, adding --save_main_session made it work!!! >>> >>> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <[email protected]> wrote: >>> >>>> > rpc error: code = Unimplemented desc = Method not found: >>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest >>>> >>>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762 >>>> >>>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> >>>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort >>>>> for a few months on Google side, and now I need community help to get it >>>>> to >>>>> work at all. >>>>> Still pretty miraculous how far Beam's portability has come since >>>>> then, even if it has a steep learning curve. >>>>> >>>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Sam, >>>>>> >>>>>> You're a wizard - this got me *way* farther than my previous >>>>>> attempts. Here's a PR >>>>>> https://github.com/sambvfx/beam-flink-k8s/pull/1 with a couple of >>>>>> changes I had to make. >>>>>> >>>>>> I had to make some additional changes that do not make sense to >>>>>> share, but here they are for the record: >>>>>> - Because I'm running on k8s engine and not minikube, I had to put >>>>>> the docker-flink image on GCR, changing flink.yaml "image: >>>>>> docker-flink:1.10" -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". >>>>>> I of course also had to build and push the container. >>>>>> - Because I'm running with a custom container based on an unreleased >>>>>> version of Beam, I had to push my custom container to GCR too, and change >>>>>> your instructions to use that image name instead of the default one >>>>>> - To fetch the containers from GCR, I had to log into Docker inside >>>>>> the Flink nodes, specifically inside the taskmanager container, using >>>>>> something like "kubectl exec pod/flink-taskmanager-blahblah -c >>>>>> taskmanager >>>>>> -- docker login -u oauth2accesstoken --password $(gcloud auth >>>>>> print-access-token)" >>>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose >>>>>> fix will be released in 2.24), I had to also build a custom Flink job >>>>>> server jar and point to it via --flink_job_server_jar. >>>>>> >>>>>> In the end, I got my pipeline to start, create the uber jar (about >>>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a >>>>>> long >>>>>> time, but it'll do for a prototype); the Flink UI was displaying the >>>>>> pipeline, and was able to *start* the worker container - however it >>>>>> quickly failed with the following error: >>>>>> >>>>>> 2020/08/28 15:49:09 Initializing python harness: >>>>>> /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:45111 >>>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get >>>>>> manifest >>>>>> caused by: >>>>>> rpc error: code = Unimplemented desc = Method not found: >>>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest >>>>>> (followed by a bunch of other garbage) >>>>>> >>>>>> I'm assuming this might be because I got tangled in my custom images >>>>>> related to the unreleased Beam SDK, and should be fixed if running on >>>>>> clean >>>>>> Beam 2.24. >>>>>> >>>>>> Thank you again! >>>>>> >>>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Holy shit, thanks Sam, this is more help than I could have asked >>>>>>> for!! >>>>>>> I'll give this a shot later today and report back. >>>>>>> >>>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Eugene! >>>>>>>> >>>>>>>> I’m struggling to find complete documentation on how to do this. >>>>>>>> There seems to be lots of conflicting or incomplete information: >>>>>>>> several >>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre >>>>>>>> StackOverflow questions, and no documentation explaining a complete >>>>>>>> working >>>>>>>> example. >>>>>>>> >>>>>>>> This *is* possible and I went through all the same frustrations of >>>>>>>> sparse and confusing documentation. I’m glossing over a lot of >>>>>>>> details, but >>>>>>>> the key thing was setting up the flink taskworker(s) to run docker. >>>>>>>> This >>>>>>>> requires running docker-in-docker as the taskworker itself is a docker >>>>>>>> container in k8s. >>>>>>>> >>>>>>>> First create a custom flink container with docker: >>>>>>>> >>>>>>>> # docker-flink Dockerfile >>>>>>>> >>>>>>>> FROM flink:1.10 >>>>>>>> # install docker >>>>>>>> RUN apt-get ... >>>>>>>> >>>>>>>> Then setup the taskmanager deployment to use a sidecar >>>>>>>> docker-in-docker service. This dind service is where the python sdk >>>>>>>> harness >>>>>>>> container actually runs. >>>>>>>> >>>>>>>> kind: Deployment >>>>>>>> ... >>>>>>>> containers: >>>>>>>> - name: docker >>>>>>>> image: docker:19.03.5-dind >>>>>>>> ... >>>>>>>> - name: taskmanger >>>>>>>> image: myregistry:5000/docker-flink:1.10 >>>>>>>> env: >>>>>>>> - name: DOCKER_HOST >>>>>>>> value: tcp://localhost:2375 >>>>>>>> ... >>>>>>>> >>>>>>>> I quickly threw all these pieces together in a repo here: >>>>>>>> https://github.com/sambvfx/beam-flink-k8s >>>>>>>> >>>>>>>> I added a working (via minikube) step-by-step in the README to >>>>>>>> prove to myself that I didn’t miss anything, but feel free to submit >>>>>>>> any >>>>>>>> PRs if you want to add anything useful. >>>>>>>> >>>>>>>> The documents you linked are very informative. It would be great to >>>>>>>> aggregate all this into digestible documentation. Let me know if you >>>>>>>> have >>>>>>>> any further questions! >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Sam >>>>>>>> >>>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Kyle, >>>>>>>>> >>>>>>>>> Thanks for the response! >>>>>>>>> >>>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but >>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I >>>>>>>>>> haven't yet >>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. >>>>>>>>>> >>>>>>>>>> Running Beam workers via Docker on the Flink nodes is not >>>>>>>>>> recommended (and probably not even possible), since the Flink nodes >>>>>>>>>> are >>>>>>>>>> themselves already running inside Docker containers. Running workers >>>>>>>>>> as >>>>>>>>>> sidecars avoids that problem. For example: >>>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20 >>>>>>>>>> >>>>>>>>>> The main problem with the sidecar approach is that I can't use >>>>>>>>> the Flink cluster as a "service" for anybody to submit their jobs with >>>>>>>>> custom containers - the container version is fixed. >>>>>>>>> Do I understand it correctly? >>>>>>>>> Seems like the Docker-in-Docker approach is viable, and is >>>>>>>>> mentioned in the Beam Flink K8s design doc >>>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq> >>>>>>>>> . >>>>>>>>> >>>>>>>>> >>>>>>>>>> > I also haven't tried this >>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> >>>>>>>>>> yet >>>>>>>>>> because it implies submitting jobs using "kubectl apply" which is >>>>>>>>>> weird - >>>>>>>>>> why not just submit it through the Flink job server? >>>>>>>>>> >>>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see >>>>>>>>>> no reason it shouldn't be possible to submit to the job server >>>>>>>>>> directly >>>>>>>>>> through Python, network permitting, though I haven't tried this. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi folks, >>>>>>>>>>> >>>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right >>>>>>>>>>> now; we have a Kubernetes Engine cluster on GCP and want to run >>>>>>>>>>> Beam Python >>>>>>>>>>> batch pipelines with custom containers against it. >>>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow >>>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're >>>>>>>>>>> going >>>>>>>>>>> with Flink. >>>>>>>>>>> >>>>>>>>>>> I'm struggling to find complete documentation on how to do this. >>>>>>>>>>> There seems to be lots of conflicting or incomplete information: >>>>>>>>>>> several >>>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, >>>>>>>>>>> bizarre >>>>>>>>>>> StackOverflow questions, and no documentation explaining a complete >>>>>>>>>>> working >>>>>>>>>>> example. >>>>>>>>>>> >>>>>>>>>>> == My requests == >>>>>>>>>>> * Could people briefly share their working setup? Would be good >>>>>>>>>>> to know which directions are promising. >>>>>>>>>>> * It would be particularly helpful if someone could volunteer an >>>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s >>>>>>>>>>> setup. >>>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I >>>>>>>>>>> volunteer to >>>>>>>>>>> write up the findings to share with the community so others suffer >>>>>>>>>>> less. >>>>>>>>>>> >>>>>>>>>>> == Appendix: My findings so far == >>>>>>>>>>> There are multiple ways to deploy Flink on k8s: >>>>>>>>>>> - The GCP marketplace Flink operator >>>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> >>>>>>>>>>> (couldn't >>>>>>>>>>> get it to work) and the respective CLI version >>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> >>>>>>>>>>> (buggy, >>>>>>>>>>> but I got it working) >>>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried) >>>>>>>>>>> - Flink's native k8s support >>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> >>>>>>>>>>> (super >>>>>>>>>>> easy to get working) >>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> >>>>>>>>>>> >>>>>>>>>>> I confirmed that my Flink cluster was operational by running a >>>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't >>>>>>>>>>> yet able >>>>>>>>>>> to get Beam working: >>>>>>>>>>> >>>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but >>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I >>>>>>>>>>> haven't yet >>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. >>>>>>>>>>> I also >>>>>>>>>>> haven't tried this >>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> >>>>>>>>>>> yet because it implies submitting jobs using "kubectl apply" which >>>>>>>>>>> is >>>>>>>>>>> weird - why not just submit it through the Flink job server? >>>>>>>>>>> >>>>>>>>>>> - With Flink's native k8s support, I tried two things: >>>>>>>>>>> - Creating a fat portable jar using --output_executable_path. >>>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink >>>>>>>>>>> cluster - >>>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the >>>>>>>>>>> same >>>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet. >>>>>>>>>>> - Simply running my pipeline --runner=FlinkRunner >>>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java >>>>>>>>>>> process >>>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even >>>>>>>>>>> starts. >>>>>>>>>>> >>>>>>>>>>> I looked at a few conference talks: >>>>>>>>>>> - >>>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf >>>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the >>>>>>>>>>> Flink >>>>>>>>>>> workers; and that I need to submit my job using "kubectl apply". >>>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also >>>>>>>>>>> mentions the sidecar, but also mentions the fat jar option >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Eugene Kirpichov >>>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Eugene Kirpichov >>>>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Eugene Kirpichov >>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Eugene Kirpichov >>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>> >>>>> >>>>> >>>>> -- >>>>> Eugene Kirpichov >>>>> http://www.linkedin.com/in/eugenekirpichov >>>>> >>>> >>> >>> -- >>> Eugene Kirpichov >>> http://www.linkedin.com/in/eugenekirpichov >>> >> > > -- > Eugene Kirpichov > http://www.linkedin.com/in/eugenekirpichov >
