Hello Romain - So, I have added pb.inheritIO().start().waitFor(); and now I have an error /bin/bash: docker: command not found. But, I have docker installed on the system. /usr/local/bin/docker Any ideas why I'm seeing this error when launched from within DoFn? Thank you so much for your help.
*--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>* On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau <[email protected]> wrote: > Weird, this code works: > > https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e > > Are you sure your test_in.csv has some data (otherwise no DoFn processing > will be triggered)? > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > > Le lun. 20 août 2018 à 16:33, Mahesh Vangala <[email protected]> a > écrit : > >> Hi Romain - >> >> I don't see any errors when I used waitFor(). >> However, I don't see those processes being executed either since "docker >> ps -a" doesn't list any processes. >> This is quite unrelated to beam itself normally. If your engine (spark, >> dataflow etc) doesn't have a security manager active >> I am using DirectRunner though. >> Let me know. >> Thank you! >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >> >> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau < >> [email protected]> wrote: >> >>> Hi Mahesh, >>> >>> Did you get the same error? This is quite unrelated to beam itself >>> normally. If your engine (spark, dataflow etc) doesn't have a security >>> manager active it should be enough, if it has you can be forbidden to use >>> that. >>> >>> Romain Manni-Bucau >>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>> <https://rmannibucau.metawerx.net/> | Old Blog >>> <http://rmannibucau.wordpress.com> | Github >>> <https://github.com/rmannibucau> | LinkedIn >>> <https://www.linkedin.com/in/rmannibucau> | Book >>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>> >>> >>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <[email protected]> >>> a écrit : >>> >>>> Hello Romain - >>>> >>>> I did try that, still no luck. >>>> Also, when I put the process start logic into separate Test script, I >>>> do notice successful docker container when I do "docker ps". >>>> However, no such luck implementing that logic with in DoFn. >>>> Any thoughts? >>>> Thank you. >>>> >>>> Regards, >>>> Mahesh >>>> >>>> *--* >>>> *Mahesh Vangala* >>>> *(Ph) 443-326-1957* >>>> *(web) mvangala.com <http://mvangala.com>* >>>> >>>> >>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau < >>>> [email protected]> wrote: >>>> >>>>> waitFor and not java wait primitive? >>>>> >>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]> >>>>> a écrit : >>>>> >>>>>> Hello Beamers - >>>>>> >>>>>> I am trying to pull a POC - launch docker image per element in Input >>>>>> PCollection and then return some data to Output Pcollection. >>>>>> >>>>>> Here is my code: >>>>>> >>>>>> public class VariantCaller >>>>>> >>>>>> { >>>>>> >>>>>> public static void main( String[] args ) >>>>>> >>>>>> { >>>>>> >>>>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>>>> ).create(); >>>>>> >>>>>> Pipeline p = Pipeline.create(opts); >>>>>> >>>>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>>>> "test_in.csv")); >>>>>> >>>>>> PCollection<String> outLines = lines.apply(ParDo.of(new >>>>>> LaunchDocker.LaunchJobs())); >>>>>> >>>>>> PCollection<String> mergedLines = outLines >>>>>> .apply(Combine.globally(new AddLines())); >>>>>> >>>>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>>>> >>>>>> p.run(); >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> My LaunchDocker Code: >>>>>> >>>>>> >>>>>> public class LaunchDocker { >>>>>> >>>>>> public static class LaunchJobs extends DoFn<String, String> { >>>>>> >>>>>> private static final long serialVersionUID = 1L; >>>>>> >>>>>> private static final Logger LOG = >>>>>> LoggerFactory.getLogger(AddLines.class); >>>>>> >>>>>> @ProcessElement >>>>>> >>>>>> public void processElement(ProcessContext c) throws Exception { >>>>>> >>>>>> // Get the input element from ProcessContext. >>>>>> >>>>>> String word = c.element().split(",")[0]; >>>>>> >>>>>> LOG.info(word); >>>>>> >>>>>> ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", >>>>>> >>>>>> "docker run --rm ubuntu:16.04 sleep 20"); >>>>>> >>>>>> pb.start().wait(); >>>>>> >>>>>> // Use ProcessContext.output to emit the output element. >>>>>> >>>>>> if (!word.isEmpty()) >>>>>> >>>>>> c.output(word + "\n"); >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> >>>>>> However, this fails with error: >>>>>> >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >>>>>> getEstimatedSizeBytes >>>>>> >>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split >>>>>> >>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 >>>>>> ms and produced 1 files and 9 bundles >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM >>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>> >>>>>> INFO: sample1 >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM >>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>> >>>>>> INFO: 4 >>>>>> >>>>>> Aug 18, 2018 10:30:23 PM >>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>>>> >>>>>> INFO: 1 >>>>>> >>>>>> Exception in thread "main" >>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>> java.lang.IllegalMonitorStateException >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>>> DirectRunner.java:332) >>>>>> >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>>>> DirectRunner.java:302) >>>>>> >>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>> DirectRunner.java:197) >>>>>> >>>>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>>>> DirectRunner.java:64) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>>>> >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>>>> >>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>>>>> >>>>>> Caused by: java.lang.IllegalMonitorStateException >>>>>> >>>>>> at java.lang.Object.wait(Native Method) >>>>>> >>>>>> at java.lang.Object.wait(Object.java:502) >>>>>> >>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >>>>>> LaunchDocker.java:19) >>>>>> >>>>>> >>>>>> Can you share your ideas what's the best way of achieving this? >>>>>> >>>>>> Thank you for your help! >>>>>> >>>>>> >>>>>> Sincerely, >>>>>> >>>>>> Mahesh >>>>>> >>>>>> >>>>>> >>>>>> *--* >>>>>> *Mahesh Vangala* >>>>>> *(Ph) 443-326-1957* >>>>>> *(web) mvangala.com <http://mvangala.com>* >>>>>> >>>>>
