Hi Romain - I don't see any errors when I used waitFor(). However, I don't see those processes being executed either since "docker ps -a" doesn't list any processes. This is quite unrelated to beam itself normally. If your engine (spark, dataflow etc) doesn't have a security manager active I am using DirectRunner though. Let me know. Thank you!
*--* *Mahesh Vangala* *(Ph) 443-326-1957* *(web) mvangala.com <http://mvangala.com>* On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <[email protected]> wrote: > Hi Mahesh, > > Did you get the same error? This is quite unrelated to beam itself > normally. If your engine (spark, dataflow etc) doesn't have a security > manager active it should be enough, if it has you can be forbidden to use > that. > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > > Le lun. 20 août 2018 à 16:08, Mahesh Vangala <[email protected]> a > écrit : > >> Hello Romain - >> >> I did try that, still no luck. >> Also, when I put the process start logic into separate Test script, I do >> notice successful docker container when I do "docker ps". >> However, no such luck implementing that logic with in DoFn. >> Any thoughts? >> Thank you. >> >> Regards, >> Mahesh >> >> *--* >> *Mahesh Vangala* >> *(Ph) 443-326-1957* >> *(web) mvangala.com <http://mvangala.com>* >> >> >> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <[email protected]> >> wrote: >> >>> waitFor and not java wait primitive? >>> >>> Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]> a >>> écrit : >>> >>>> Hello Beamers - >>>> >>>> I am trying to pull a POC - launch docker image per element in Input >>>> PCollection and then return some data to Output Pcollection. >>>> >>>> Here is my code: >>>> >>>> public class VariantCaller >>>> >>>> { >>>> >>>> public static void main( String[] args ) >>>> >>>> { >>>> >>>> PipelineOptions opts = PipelineOptionsFactory.fromArgs(args >>>> ).create(); >>>> >>>> Pipeline p = Pipeline.create(opts); >>>> >>>> PCollection<String> lines = p.apply(TextIO.read().from( >>>> "test_in.csv")); >>>> >>>> PCollection<String> outLines = lines.apply(ParDo.of(new >>>> LaunchDocker.LaunchJobs())); >>>> >>>> PCollection<String> mergedLines = outLines >>>> .apply(Combine.globally(new AddLines())); >>>> >>>> mergedLines.apply(TextIO.write().to("test_out.csv")); >>>> >>>> p.run(); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> My LaunchDocker Code: >>>> >>>> >>>> public class LaunchDocker { >>>> >>>> public static class LaunchJobs extends DoFn<String, String> { >>>> >>>> private static final long serialVersionUID = 1L; >>>> >>>> private static final Logger LOG = LoggerFactory.getLogger(AddLines. >>>> class); >>>> >>>> @ProcessElement >>>> >>>> public void processElement(ProcessContext c) throws Exception { >>>> >>>> // Get the input element from ProcessContext. >>>> >>>> String word = c.element().split(",")[0]; >>>> >>>> LOG.info(word); >>>> >>>> ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", >>>> >>>> "docker run --rm ubuntu:16.04 sleep 20"); >>>> >>>> pb.start().wait(); >>>> >>>> // Use ProcessContext.output to emit the output element. >>>> >>>> if (!word.isEmpty()) >>>> >>>> c.output(word + "\n"); >>>> >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> However, this fails with error: >>>> >>>> >>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource >>>> getEstimatedSizeBytes >>>> >>>> INFO: Filepattern test_in.csv matched 1 files with total size 36 >>>> >>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split >>>> >>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took 1 >>>> ms and produced 1 files and 9 bundles >>>> >>>> Aug 18, 2018 10:30:23 PM >>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>> >>>> INFO: sample1 >>>> >>>> Aug 18, 2018 10:30:23 PM >>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>> >>>> INFO: 4 >>>> >>>> Aug 18, 2018 10:30:23 PM >>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement >>>> >>>> INFO: 1 >>>> >>>> Exception in thread "main" >>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>> java.lang.IllegalMonitorStateException >>>> >>>> at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>> DirectRunner.java:332) >>>> >>>> at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish( >>>> DirectRunner.java:302) >>>> >>>> at org.apache.beam.runners.direct.DirectRunner.run( >>>> DirectRunner.java:197) >>>> >>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64 >>>> ) >>>> >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) >>>> >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) >>>> >>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29) >>>> >>>> Caused by: java.lang.IllegalMonitorStateException >>>> >>>> at java.lang.Object.wait(Native Method) >>>> >>>> at java.lang.Object.wait(Object.java:502) >>>> >>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement( >>>> LaunchDocker.java:19) >>>> >>>> >>>> Can you share your ideas what's the best way of achieving this? >>>> >>>> Thank you for your help! >>>> >>>> >>>> Sincerely, >>>> >>>> Mahesh >>>> >>>> >>>> >>>> *--* >>>> *Mahesh Vangala* >>>> *(Ph) 443-326-1957* >>>> *(web) mvangala.com <http://mvangala.com>* >>>> >>>
