Re: Beam Spark/Flink runner with DC/OS

2017-01-30 Thread Chaoran Yu
Thank you Davor for the reply!

Your understanding of my problem is exactly right. 

I thought about the issue you mentioned. Then I looked at Beam source code. It 
looks to me that IO is done via IOChannelFactory class. And it has two 
subclasses, FileIOChannelFactory and GcsIOChannelFactory. I figured probably 
the wrong class got registered. This link I found 
http://markmail.org/message/mrv4cg4y6bjtdssy 
<http://markmail.org/message/mrv4cg4y6bjtdssy> points out the same registration 
problem. So I tried registering GcsIOChannelFactory, but got the following 
error:

Scheme: [file] is already registered with class 
org.apache.beam.sdk.util.FileIOChannelFactory

Now I’m not sure what to do..

Thanks for the help!

Chaoran



> On Jan 30, 2017, at 12:05 PM, Davor Bonaci <da...@apache.org> wrote:
> 
> Sorry for the delay here.
> 
> Am I correct in summarizing that "gs://bucket/file" doesn't work on a Spark 
> cluster, but does with Spark runner locally? Beam file systems utilize 
> AutoService functionality and, generally speaking, all filesystems should be 
> available and automatically registered on all runners. This is probably just 
> a simple matter of staging the right classes on the cluster.
> 
> Pei, any additional thoughts here?
> 
> On Mon, Jan 23, 2017 at 1:58 PM, Chaoran Yu <chaoran...@lightbend.com 
> <mailto:chaoran...@lightbend.com>> wrote:
> Sorry for the spam. But to clarify, I didn’t write the code. I’m using the 
> code described here https://beam.apache.org/get-started/wordcount-example/ 
> <https://beam.apache.org/get-started/wordcount-example/>
> So the file already exists in GS.
> 
>> On Jan 23, 2017, at 4:55 PM, Chaoran Yu <chaoran...@lightbend.com 
>> <mailto:chaoran...@lightbend.com>> wrote:
>> 
>> I didn’t upload the file. But since the identical Beam code, when running in 
>> Spark local mode, was able to fetch the file and process it, the file does 
>> exist.
>> It’s just that somehow Spark standalone mode can’t find the file.
>> 
>> 
>>> On Jan 23, 2017, at 4:50 PM, Amit Sela <amitsel...@gmail.com 
>>> <mailto:amitsel...@gmail.com>> wrote:
>>> 
>>> I think "external" is the key here, you're cluster is running all it's 
>>> components on your local machine so you're good.
>>> 
>>> As for GS, it's like Amazon's S3 or sort-of a cloud service HDFS offered by 
>>> Google. You need to upload your file to GS. Have you ?  
>>> 
>>> On Mon, Jan 23, 2017 at 11:47 PM Chaoran Yu <chaoran...@lightbend.com 
>>> <mailto:chaoran...@lightbend.com>> wrote:
>>> Well, my file is not in my local filesystem. It’s in GS. 
>>> This is the line of code that reads the input file: 
>>> p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/* <>"))
>>> 
>>> And this page https://beam.apache.org/get-started/quickstart/ 
>>> <https://beam.apache.org/get-started/quickstart/> says the following:
>>> "you can’t access a local file if you are running the pipeline on an 
>>> external cluster”.
>>> I’m indeed trying to run a pipeline on a standalone Spark cluster running 
>>> on my local machine. So local files are not an option.
>>> 
>>> 
>>>> On Jan 23, 2017, at 4:41 PM, Amit Sela <amitsel...@gmail.com 
>>>> <mailto:amitsel...@gmail.com>> wrote:
>>>> 
>>>> Why not try file:// instead ? it doesn't seem like you're using Google 
>>>> Storage, right ? I mean the input file is on your local FS.
>>>> 
>>>> On Mon, Jan 23, 2017 at 11:34 PM Chaoran Yu <chaoran...@lightbend.com 
>>>> <mailto:chaoran...@lightbend.com>> wrote:
>>>> No I’m not using Dataproc.
>>>> I’m simply running on my local machine. I started a local Spark cluster 
>>>> with sbin/start-master.sh and sbin/start-slave.sh. Then I submitted my 
>>>> Beam job to that cluster.
>>>> The gs file is the kinglear.txt from Beam’s example code and it should be 
>>>> public. 
>>>> 
>>>> My full stack trace is attached.
>>>> 
>>>> Thanks,
>>>> Chaoran
>>>> 
>>>> 
>>>> 
>>>>> On Jan 23, 2017, at 4:23 PM, Amit Sela <amitsel...@gmail.com 
>>>>> <mailto:amitsel...@gmail.com>> wrote:
>>>>> 
>>>>> Maybe, are you running on Dataproc ? are you using YARN/Mesos ? do the 
>>>>> machines hosting the executor processes have access to GS ? could you 
>>>>> pas

Re: Beam Spark/Flink runner with DC/OS

2017-01-23 Thread Chaoran Yu
Sorry for the spam. But to clarify, I didn’t write the code. I’m using the code 
described here https://beam.apache.org/get-started/wordcount-example/ 
<https://beam.apache.org/get-started/wordcount-example/>
So the file already exists in GS.

> On Jan 23, 2017, at 4:55 PM, Chaoran Yu <chaoran...@lightbend.com> wrote:
> 
> I didn’t upload the file. But since the identical Beam code, when running in 
> Spark local mode, was able to fetch the file and process it, the file does 
> exist.
> It’s just that somehow Spark standalone mode can’t find the file.
> 
> 
>> On Jan 23, 2017, at 4:50 PM, Amit Sela <amitsel...@gmail.com 
>> <mailto:amitsel...@gmail.com>> wrote:
>> 
>> I think "external" is the key here, you're cluster is running all it's 
>> components on your local machine so you're good.
>> 
>> As for GS, it's like Amazon's S3 or sort-of a cloud service HDFS offered by 
>> Google. You need to upload your file to GS. Have you ?  
>> 
>> On Mon, Jan 23, 2017 at 11:47 PM Chaoran Yu <chaoran...@lightbend.com 
>> <mailto:chaoran...@lightbend.com>> wrote:
>> Well, my file is not in my local filesystem. It’s in GS. 
>> This is the line of code that reads the input file: 
>> p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/* <>"))
>> 
>> And this page https://beam.apache.org/get-started/quickstart/ 
>> <https://beam.apache.org/get-started/quickstart/> says the following:
>> "you can’t access a local file if you are running the pipeline on an 
>> external cluster”.
>> I’m indeed trying to run a pipeline on a standalone Spark cluster running on 
>> my local machine. So local files are not an option.
>> 
>> 
>>> On Jan 23, 2017, at 4:41 PM, Amit Sela <amitsel...@gmail.com 
>>> <mailto:amitsel...@gmail.com>> wrote:
>>> 
>>> Why not try file:// instead ? it doesn't seem like you're using Google 
>>> Storage, right ? I mean the input file is on your local FS.
>>> 
>>> On Mon, Jan 23, 2017 at 11:34 PM Chaoran Yu <chaoran...@lightbend.com 
>>> <mailto:chaoran...@lightbend.com>> wrote:
>>> No I’m not using Dataproc.
>>> I’m simply running on my local machine. I started a local Spark cluster 
>>> with sbin/start-master.sh and sbin/start-slave.sh. Then I submitted my Beam 
>>> job to that cluster.
>>> The gs file is the kinglear.txt from Beam’s example code and it should be 
>>> public. 
>>> 
>>> My full stack trace is attached.
>>> 
>>> Thanks,
>>> Chaoran
>>> 
>>> 
>>> 
>>>> On Jan 23, 2017, at 4:23 PM, Amit Sela <amitsel...@gmail.com 
>>>> <mailto:amitsel...@gmail.com>> wrote:
>>>> 
>>>> Maybe, are you running on Dataproc ? are you using YARN/Mesos ? do the 
>>>> machines hosting the executor processes have access to GS ? could you 
>>>> paste the entire stack trace ?
>>>> 
>>>> On Mon, Jan 23, 2017 at 11:21 PM Chaoran Yu <chaoran...@lightbend.com 
>>>> <mailto:chaoran...@lightbend.com>> wrote:
>>>> Thank you Amit for the reply,
>>>> 
>>>> I just tried two more runners and below is a summary:
>>>> 
>>>> DirectRunner: works
>>>> FlinkRunner: works in local mode. I got an error “Communication with 
>>>> JobManager failed: lost connection to the JobManager” when running in 
>>>> cluster mode, 
>>>> SparkRunner: works in local mode (mvn exec command) but fails in cluster 
>>>> mode (spark-submit) with the error I pasted in the previous email.
>>>> 
>>>> In SparkRunner’s case, can it be that Spark executor can’t access gs file 
>>>> in Google Storage?
>>>> 
>>>> Thank you,
>>>> 
>>>> 
>>>> 
>>>>> On Jan 23, 2017, at 3:28 PM, Amit Sela <amitsel...@gmail.com 
>>>>> <mailto:amitsel...@gmail.com>> wrote:
>>>>> 
>>>>> Is this working for you with other runners ? judging by the stack trace, 
>>>>> it seems like IOChannelUtils fails to find a handler so it doesn't seem 
>>>>> like it is a Spark specific problem. 
>>>>> 
>>>>> On Mon, Jan 23, 2017 at 8:50 PM Chaoran Yu <chaoran...@lightbend.com 
>>>>> <mailto:chaoran...@lightbend.com>> wrote:
>>>>> Thank you Amit and JB! 
>>>>> 
>>>>> This is not related to DC/OS itself, bu

Re: Beam Spark/Flink runner with DC/OS

2017-01-23 Thread Chaoran Yu
Well, my file is not in my local filesystem. It’s in GS. 
This is the line of code that reads the input file: 
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))

And this page https://beam.apache.org/get-started/quickstart/ 
<https://beam.apache.org/get-started/quickstart/> says the following:
"you can’t access a local file if you are running the pipeline on an external 
cluster”.
I’m indeed trying to run a pipeline on a standalone Spark cluster running on my 
local machine. So local files are not an option.


> On Jan 23, 2017, at 4:41 PM, Amit Sela <amitsel...@gmail.com> wrote:
> 
> Why not try file:// instead ? it doesn't seem like you're using Google 
> Storage, right ? I mean the input file is on your local FS.
> 
> On Mon, Jan 23, 2017 at 11:34 PM Chaoran Yu <chaoran...@lightbend.com 
> <mailto:chaoran...@lightbend.com>> wrote:
> No I’m not using Dataproc.
> I’m simply running on my local machine. I started a local Spark cluster with 
> sbin/start-master.sh and sbin/start-slave.sh. Then I submitted my Beam job to 
> that cluster.
> The gs file is the kinglear.txt from Beam’s example code and it should be 
> public. 
> 
> My full stack trace is attached.
> 
> Thanks,
> Chaoran
> 
> 
> 
>> On Jan 23, 2017, at 4:23 PM, Amit Sela <amitsel...@gmail.com 
>> <mailto:amitsel...@gmail.com>> wrote:
>> 
>> Maybe, are you running on Dataproc ? are you using YARN/Mesos ? do the 
>> machines hosting the executor processes have access to GS ? could you paste 
>> the entire stack trace ?
>> 
>> On Mon, Jan 23, 2017 at 11:21 PM Chaoran Yu <chaoran...@lightbend.com 
>> <mailto:chaoran...@lightbend.com>> wrote:
>> Thank you Amit for the reply,
>> 
>> I just tried two more runners and below is a summary:
>> 
>> DirectRunner: works
>> FlinkRunner: works in local mode. I got an error “Communication with 
>> JobManager failed: lost connection to the JobManager” when running in 
>> cluster mode, 
>> SparkRunner: works in local mode (mvn exec command) but fails in cluster 
>> mode (spark-submit) with the error I pasted in the previous email.
>> 
>> In SparkRunner’s case, can it be that Spark executor can’t access gs file in 
>> Google Storage?
>> 
>> Thank you,
>> 
>> 
>> 
>>> On Jan 23, 2017, at 3:28 PM, Amit Sela <amitsel...@gmail.com 
>>> <mailto:amitsel...@gmail.com>> wrote:
>>> 
>>> Is this working for you with other runners ? judging by the stack trace, it 
>>> seems like IOChannelUtils fails to find a handler so it doesn't seem like 
>>> it is a Spark specific problem. 
>>> 
>>> On Mon, Jan 23, 2017 at 8:50 PM Chaoran Yu <chaoran...@lightbend.com 
>>> <mailto:chaoran...@lightbend.com>> wrote:
>>> Thank you Amit and JB! 
>>> 
>>> This is not related to DC/OS itself, but I ran into a problem when 
>>> launching a Spark job on a cluster with spark-submit. My Spark job written 
>>> in Beam can’t read the specified gs file. I got the following error:
>>> 
>>> Caused by: java.io.IOException: Unable to find handler for 
>>> gs://beam-samples/sample.txt <>
>>> at 
>>> org.apache.beam.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:307)
>>> at 
>>> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:528)
>>> at 
>>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:271)
>>> at 
>>> org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.hasNext(SourceRDD.java:125)
>>> 
>>> Then I thought about switching to reading from another source, but I saw in 
>>> Beam’s documentation that TextIO can only read from files in Google Cloud 
>>> Storage (prefixed with gs://) when running in cluster mode. How do you guys 
>>> doing file IO in Beam when using the SparkRunner?
>>> 
>>> 
>>> Thank you,
>>> Chaoran
>>> 
>>> 
>>>> On Jan 22, 2017, at 4:32 AM, Amit Sela <amitsel...@gmail.com 
>>>> <mailto:amitsel...@gmail.com>> wrote:
>>>> 
>>>> I'lll join JB's comment on the Spark runner saying that submitting Beam 
>>>> pipelines using the Spark runner can be done using Spark's spark-submit 
>>>> script, find out more in the Spark runner documentation 
>>>> <https://beam.apache.org/documentation/runners/spark/>.
>>>> 
>>>> Amit.
>>>> 
>>>> On Sun, Jan 22, 2017

Re: Beam Spark/Flink runner with DC/OS

2017-01-23 Thread Chaoran Yu
nglear.txt
at 
org.apache.beam.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:307)
at 
org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:528)
at 
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:271)
at 
org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.hasNext(SourceRDD.java:125)
... 25 more
17/01/23 16:32:12 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
17/01/23 16:32:12 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
17/01/23 16:32:12 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
^C17/01/23 16:32:25 INFO ShutdownHookManager: Shutdown hook called
17/01/23 16:32:25 INFO ShutdownHookManager: Deleting directory 
/private/var/folders/7k/mv0jws6j7ws8pnp350h1ndtwgn/T/spark-70afbd21-f270-4670-88d6-2895e5f23ea5/httpd-04f079ac-4416-4e74-9dd3-5bf70f70ee8e
17/01/23 16:32:25 INFO ShutdownHookManager: Deleting directory 
/private/var/folders/7k/mv0jws6j7ws8pnp350h1ndtwgn/T/spark-70afbd21-f270-4670-88d6-2895e5f23ea5
On Jan 23, 2017, at 4:23 PM, Amit Sela <amitsel...@gmail.com> wrote:Maybe, are you running on Dataproc ? are you using YARN/Mesos ? do the machines hosting the executor processes have access to GS ? could you paste the entire stack trace ?On Mon, Jan 23, 2017 at 11:21 PM Chaoran Yu <chaoran...@lightbend.com> wrote:Thank you Amit for the reply,I just tried two more runners and below is a summary:DirectRunner: worksFlinkRunner: works in local mode. I got an error “Communication with JobManager failed: lost connection to the JobManager” when running in cluster mode, SparkRunner: works in local mode (mvn exec command) but fails in cluster mode (spark-submit) with the error I pasted in the previous email.In SparkRunner’s case, can it be that Spark executor can’t access gs file in Google Storage?Thank you,On Jan 23, 2017, at 3:28 PM, Amit Sela <amitsel...@gmail.com> wrote:Is this working for you with other runners ? judging by the stack trace, it seems like IOChannelUtils fails to find a handler so it doesn't seem like it is a Spark specific problem. On Mon, Jan 23, 2017 at 8:50 PM Chaoran Yu <chaoran...@lightbend.com> wrote:Thank you Amit and JB! This is not related to DC/OS itself, but I ran into a problem when launching a Spark job on a cluster with spark-submit. My Spark job written in Beam can’t read the specified gs file. I got the following error:Caused by: java.io.IOException: Unable to find handler for gs://beam-samples/sample.txt
	at org.apache.beam.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:307)
	at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:528)
	at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:271)
	at org.apache.beam.runners.spark.io.SourceRDD$Bounded$1.hasNext(SourceRDD.java:125)Then I thought about switching to reading from another source, but I saw in Beam’s documentation that TextIO can only read from files in Google Cloud Storage (prefixed with gs://) when running in cluster mode. How do you guys doing file IO in Beam when using the SparkRunner?Thank you,ChaoranOn Jan 22, 2017, at 4:32 AM, Amit Sela <amitsel...@gmail.com> wrote:I'lll join JB's comment on the Spark runner saying that submitting Beam pipelines using the Spark runner can be done using Spark's spark-submit script, find out more in the Spark runner documentation.Amit.On Sun, Jan 22, 2017 at 8:03 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:Hi,

Not directly DCOS (I think Stephen did some test on it), but I have a
platform running Spark and Flink with Beam on Mesos + Marathon.

It basically doesn't have anything special as running piplines uses
spark-submit (as on in Spark "natively").

Regards
JB

On 01/22/2017 12:56 AM, Chaoran Yu wrote:
> Hello all,
>
>   Has anyone had experience using Beam on DC/OS? I want to run Beam code
>
> executed with Spark runner on DC/OS. As a next step, I would like to run the
>
> Flink runner as well. There doesn't seem to exist any information
> about running
>
> Beam on DC/OS I can find on the web. So some pointers are greatly
> appreciated.
>
> Thank you,
>
> Chaoran Yu
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com





Re: Regarding Beam Slack Channel

2017-01-23 Thread Chaoran Yu
Can anyone add me to the channel as well? Thank you!
chaoran...@lightbend.com 

Chaoran

> On Jan 23, 2017, at 1:32 AM, Ritesh Kasat  wrote:
> 
> Hello,
> 
> Can someone add me to the Beam slack channel.
> 
> Thanks
> Ritesh



Beam Spark/Flink runner with DC/OS

2017-01-21 Thread Chaoran Yu
Hello all,

  Has anyone had experience using Beam on DC/OS? I want to run Beam code

executed with Spark runner on DC/OS. As a next step, I would like to run the

Flink runner as well. There doesn't seem to exist any information about
running

Beam on DC/OS I can find on the web. So some pointers are greatly
appreciated.

Thank you,

Chaoran Yu