Hi Romain -

OKay, so I was running it through eclipse. I believe the jobs were running
as eclipse user (?) as oppose to me (authorized to launch docker).
Once, I ran project on command line (irrespective of sudo or not),
containers executed just fine.
Let me know if that makes sense.
Thanks again.

Regards,
Mahesh
*--*
*Mahesh Vangala*
*(Ph) 443-326-1957*
*(web) mvangala.com <http://mvangala.com>*


On Mon, Aug 20, 2018 at 11:18 AM Mahesh Vangala <[email protected]>
wrote:

> Hi Romain -
>
> Got it. Apparently, I needed to run beam as sudo to launch the docker
> containers.
> I still need to figure out why that's the case.
> Thank you.
> *--*
> *Mahesh Vangala*
> *(Ph) 443-326-1957*
> *(web) mvangala.com <http://mvangala.com>*
>
>
> On Mon, Aug 20, 2018 at 11:05 AM Mahesh Vangala <[email protected]>
> wrote:
>
>> Hello Romain -
>>
>> So, I have added pb.inheritIO().start().waitFor(); and now I have an
>> error /bin/bash: docker: command not found.
>> But, I have docker installed on the system. /usr/local/bin/docker
>> Any ideas why I'm seeing this error when launched from within DoFn?
>> Thank you so much for your help.
>>
>> *--*
>> *Mahesh Vangala*
>> *(Ph) 443-326-1957*
>> *(web) mvangala.com <http://mvangala.com>*
>>
>>
>> On Mon, Aug 20, 2018 at 10:58 AM Romain Manni-Bucau <
>> [email protected]> wrote:
>>
>>> Weird, this code works:
>>>
>>> https://gist.github.com/rmannibucau/4703f321bb1962d1303f8eccbd05df0e
>>>
>>> Are you sure your test_in.csv has some data (otherwise no DoFn
>>> processing will be triggered)?
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>>
>>> Le lun. 20 août 2018 à 16:33, Mahesh Vangala <[email protected]>
>>> a écrit :
>>>
>>>> Hi Romain -
>>>>
>>>> I don't see any errors when I used waitFor().
>>>> However, I don't see those processes being executed either since
>>>> "docker ps -a" doesn't list any processes.
>>>> This is quite unrelated to beam itself normally. If your engine (spark,
>>>> dataflow etc) doesn't have a security manager active
>>>> I am using DirectRunner though.
>>>> Let me know.
>>>> Thank you!
>>>>
>>>> *--*
>>>> *Mahesh Vangala*
>>>> *(Ph) 443-326-1957*
>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>
>>>>
>>>> On Mon, Aug 20, 2018 at 10:28 AM Romain Manni-Bucau <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Mahesh,
>>>>>
>>>>> Did you get the same error? This is quite unrelated to beam itself
>>>>> normally. If your engine (spark, dataflow etc) doesn't have a security
>>>>> manager active it should be enough, if it has you can be forbidden to use
>>>>> that.
>>>>>
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>
>>>>>
>>>>> Le lun. 20 août 2018 à 16:08, Mahesh Vangala <[email protected]>
>>>>> a écrit :
>>>>>
>>>>>> Hello Romain -
>>>>>>
>>>>>> I did try that, still no luck.
>>>>>> Also, when I put the process start logic into separate Test script, I
>>>>>> do notice successful docker container when I do "docker ps".
>>>>>> However, no such luck implementing that logic with in DoFn.
>>>>>> Any thoughts?
>>>>>> Thank you.
>>>>>>
>>>>>> Regards,
>>>>>> Mahesh
>>>>>>
>>>>>> *--*
>>>>>> *Mahesh Vangala*
>>>>>> *(Ph) 443-326-1957*
>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>
>>>>>>
>>>>>> On Sun, Aug 19, 2018 at 3:53 AM Romain Manni-Bucau <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> waitFor and not java wait primitive?
>>>>>>>
>>>>>>> Le dim. 19 août 2018 04:35, Mahesh Vangala <[email protected]>
>>>>>>> a écrit :
>>>>>>>
>>>>>>>> Hello Beamers -
>>>>>>>>
>>>>>>>> I am trying to pull a POC - launch docker image per element in
>>>>>>>> Input PCollection and then return some data to Output Pcollection.
>>>>>>>>
>>>>>>>> Here is my code:
>>>>>>>>
>>>>>>>> public class VariantCaller
>>>>>>>>
>>>>>>>> {
>>>>>>>>
>>>>>>>>     public static void main( String[] args )
>>>>>>>>
>>>>>>>>     {
>>>>>>>>
>>>>>>>>         PipelineOptions opts = PipelineOptionsFactory.fromArgs(args
>>>>>>>> ).create();
>>>>>>>>
>>>>>>>>         Pipeline p = Pipeline.create(opts);
>>>>>>>>
>>>>>>>>         PCollection<String> lines = p.apply(TextIO.read().from(
>>>>>>>> "test_in.csv"));
>>>>>>>>
>>>>>>>>         PCollection<String> outLines = lines.apply(ParDo.of(new
>>>>>>>> LaunchDocker.LaunchJobs()));
>>>>>>>>
>>>>>>>>         PCollection<String> mergedLines = outLines
>>>>>>>> .apply(Combine.globally(new AddLines()));
>>>>>>>>
>>>>>>>>         mergedLines.apply(TextIO.write().to("test_out.csv"));
>>>>>>>>
>>>>>>>>         p.run();
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> My LaunchDocker Code:
>>>>>>>>
>>>>>>>>
>>>>>>>> public class LaunchDocker {
>>>>>>>>
>>>>>>>>   public static class LaunchJobs extends DoFn<String, String> {
>>>>>>>>
>>>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>>>
>>>>>>>>     private static final Logger LOG =
>>>>>>>> LoggerFactory.getLogger(AddLines.class);
>>>>>>>>
>>>>>>>>     @ProcessElement
>>>>>>>>
>>>>>>>>     public void processElement(ProcessContext c) throws Exception {
>>>>>>>>
>>>>>>>>       // Get the input element from ProcessContext.
>>>>>>>>
>>>>>>>>       String word = c.element().split(",")[0];
>>>>>>>>
>>>>>>>>       LOG.info(word);
>>>>>>>>
>>>>>>>>       ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c",
>>>>>>>>
>>>>>>>>           "docker run --rm ubuntu:16.04 sleep 20");
>>>>>>>>
>>>>>>>>        pb.start().wait();
>>>>>>>>
>>>>>>>>       // Use ProcessContext.output to emit the output element.
>>>>>>>>
>>>>>>>>       if (!word.isEmpty())
>>>>>>>>
>>>>>>>>         c.output(word + "\n");
>>>>>>>>
>>>>>>>>     }
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> However, this fails with error:
>>>>>>>>
>>>>>>>>
>>>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>>>>>>> getEstimatedSizeBytes
>>>>>>>>
>>>>>>>> INFO: Filepattern test_in.csv matched 1 files with total size 36
>>>>>>>>
>>>>>>>> Aug 18, 2018 10:30:23 PM org.apache.beam.sdk.io.FileBasedSource
>>>>>>>> split
>>>>>>>>
>>>>>>>> INFO: Splitting filepattern test_in.csv into bundles of size 4 took
>>>>>>>> 1 ms and produced 1 files and 9 bundles
>>>>>>>>
>>>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>>>
>>>>>>>> INFO: sample1
>>>>>>>>
>>>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>>>
>>>>>>>> INFO: 4
>>>>>>>>
>>>>>>>> Aug 18, 2018 10:30:23 PM
>>>>>>>> pipelines.variant_caller.LaunchDocker$LaunchJobs processElement
>>>>>>>>
>>>>>>>> INFO: 1
>>>>>>>>
>>>>>>>> Exception in thread "main"
>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>>>>> java.lang.IllegalMonitorStateException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>>>> DirectRunner.java:332)
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(
>>>>>>>> DirectRunner.java:302)
>>>>>>>>
>>>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>>> DirectRunner.java:197)
>>>>>>>>
>>>>>>>> at org.apache.beam.runners.direct.DirectRunner.run(
>>>>>>>> DirectRunner.java:64)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
>>>>>>>>
>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
>>>>>>>>
>>>>>>>> at pipelines.variant_caller.VariantCaller.main(
>>>>>>>> VariantCaller.java:29)
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalMonitorStateException
>>>>>>>>
>>>>>>>> at java.lang.Object.wait(Native Method)
>>>>>>>>
>>>>>>>> at java.lang.Object.wait(Object.java:502)
>>>>>>>>
>>>>>>>> at pipelines.variant_caller.LaunchDocker$LaunchJobs.processElement(
>>>>>>>> LaunchDocker.java:19)
>>>>>>>>
>>>>>>>>
>>>>>>>> Can you share your ideas what's the best way of achieving this?
>>>>>>>>
>>>>>>>> Thank you for your help!
>>>>>>>>
>>>>>>>>
>>>>>>>> Sincerely,
>>>>>>>>
>>>>>>>> Mahesh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *--*
>>>>>>>> *Mahesh Vangala*
>>>>>>>> *(Ph) 443-326-1957*
>>>>>>>> *(web) mvangala.com <http://mvangala.com>*
>>>>>>>>
>>>>>>>

Reply via email to