Hi Romain -

Got it. Apparently, I needed to run beam as sudo to launch the docker
containers.
I still need to figure out why that's the case.
Thank you.
*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Mon, Aug 20, 2018 at 11:05 AM Mahesh Vangala <vangalamahe...@gmail.com>
wrote:

> Hello Romain -
>
> So, I have added pb.inheritIO().start().waitFor(); and now I have an
> error /bin/bash: docker: command not found.
> But, I have docker installed on the system. /usr/local/bin/docker
> Any ideas why I'm seeing this error when launched from within DoFn?
> Thank you so much for your help.
>
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Weird, this code works:
>>
>> https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e
>>
>> Are you sure your test_in.csv has some data (otherwise no DoFn processing
>> will be triggered)?
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>>
>> Le lun. 20 août 2018 à 16:33, Mahesh Vangala <vangalamahe...@gmail.com>
>> a écrit :
>>
>>> Hi Romain -
>>>
>>> I don't see any errors when I used waitFor().
>>> However, I don't see those processes being executed either since "docker
>>> ps -a" doesn't list any processes.
>>> This is quite unrelated to beam itself normally. If your engine (spark,
>>> dataflow etc) doesn't have a security manager active
>>> I am using DirectRunner though.
>>> Let me know.
>>> Thank you!
>>>
>>> *--*
>>> *Mahesh Vangala*
>>> *(Ph) 443-326-1957*
>>> *(web) mvangala.com <http://mvangala.com>*
>>>
>>>
>>> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> Hi Mahesh,
>>>>
>>>> Did you get the same error? This is quite unrelated to beam itself
>>>> normally. If your engine (spark, dataflow etc) doesn't have a security
>>>> manager active it should be enough, if it has you can be forbidden to use
>>>> that.
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>> <http://rmannibucau.wordpress.com> | Github
>>>> <https://github.com/rmannibucau> | LinkedIn
>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>
>>>>
>>>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <vangalamahe...@gmail.com>
>>>> a écrit :
>>>>
>>>>> Hello Romain -
>>>>>
>>>>> I did try that, still no luck.
>>>>> Also, when I put the process start logic into separate Test script, I
>>>>> do notice successful docker container when I do "docker ps".
>>>>> However, no such luck implementing that logic with in DoFn.
>>>>> Any thoughts?
>>>>> Thank you.
>>>>>
>>>>> Regards,
>>>>> Mahesh
>>>>>
>>>>> *--*
>>>>> *Mahesh Vangala*
>>>>> *(Ph) 443-326-1957*
>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>
>>>>>
>>>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> waitFor and not java wait primitive?
>>>>>>
>>>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <vangalamahe...@gmail.com>
>>>>>> a écrit :
>>>>>>
>>>>>>> Hello Beamers -
>>>>>>>
>>>>>>> I am trying to pull a POC - launch docker image per element in Input
>>>>>>> PCollection and then return some data to Output Pcollection.
>>>>>>>
>>>>>>> Here is my code:
>>>>>>>
>>>>>>> public class VariantCaller
>>>>>>>
>>>>>>> {
>>>>>>>
>>>>>>>     public static void main( String[] args )
>>>>>>>
>>>>>>>     {
>>>>>>>
>>>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>>>> ).create();
>>>>>>>
>>>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>>>
>>>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>>>> "test_in.csv"));
>>>>>>>
>>>>>>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>>>>>>> LaunchDocker.LaunchJobs()));
>>>>>>>
>>>>>>>         PCollection<String> mergedLines = outLines
>>>>>>> .apply(Combine.globally(new AddLines()));
>>>>>>>
>>>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>>>
>>>>>>>         p.run();
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> My LaunchDocker Code:
>>>>>>>
>>>>>>>
>>>>>>> public class LaunchDocker {
>>>>>>>
>>>>>>>   public static class LaunchJobs extends DoFn<String, String> {
>>>>>>>
>>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>>
>>>>>>>     private static final Logger LOG =
>>>>>>> LoggerFactory.getLogger(AddLines.class);
>>>>>>>
>>>>>>>     @ProcessElement
>>>>>>>
>>>>>>>     public void processElement(ProcessContext c) throws Exception {
>>>>>>>
>>>>>>>       // Get the input element from ProcessContext.
>>>>>>>
>>>>>>>       String word = c.element().split(",")[0];
>>>>>>>
>>>>>>>       LOG.info(word);
>>>>>>>
>>>>>>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>>>>>
>>>>>>>           "docker run --rm ubuntu:16.04 sleep 20");
>>>>>>>
>>>>>>>        pb.start().wait();
>>>>>>>
>>>>>>>       // Use ProcessContext.output to emit the output element.
>>>>>>>
>>>>>>>       if (!word.isEmpty())
>>>>>>>
>>>>>>>         c.output(word + "\n");
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> However, this fails with error:
>>>>>>>
>>>>>>>
>>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>>>>>> getEstimatedSizeBytes
>>>>>>>
>>>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>>>
>>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource split
>>>>>>>
>>>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took
>>>>>>> 1 ms and produced 1 files and 9 bundles
>>>>>>>
>>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>>
>>>>>>> INFO: sample1
>>>>>>>
>>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>>
>>>>>>> INFO: 4
>>>>>>>
>>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>>
>>>>>>> INFO: 1
>>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>> java.lang.IllegalMonitorStateException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>>> DirectRunner.java:332)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>>> DirectRunner.java:302)
>>>>>>>
>>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>> DirectRunner.java:197)
>>>>>>>
>>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>> DirectRunner.java:64)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>>>>
>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>>>>
>>>>>>> at pipelines.variant_caller.VariantCaller.main(VariantCaller.java:29
>>>>>>> )
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalMonitorStateException
>>>>>>>
>>>>>>> at java.lang.Object.wait(Native Method)
>>>>>>>
>>>>>>> at java.lang.Object.wait(Object.java:502)
>>>>>>>
>>>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>>>>>> LaunchDocker.java:19)
>>>>>>>
>>>>>>>
>>>>>>> Can you share your ideas what's the best way of achieving this?
>>>>>>>
>>>>>>> Thank you for your help!
>>>>>>>
>>>>>>>
>>>>>>> Sincerely,
>>>>>>>
>>>>>>> Mahesh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *--*
>>>>>>> *Mahesh Vangala*
>>>>>>> *(Ph) 443-326-1957*
>>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>>
>>>>>>

Reply via email to