Re: How to use "PortableRunner" in Python SDK?

2019-01-23 Thread Robert Bradshaw
We should probably make the job endpoint mandatory for PortableRunner,
and offer a separate FlinkRunner (and others) that provides a default
endpoint and otherwise delegates everything down.

On Thu, Nov 15, 2018 at 12:07 PM Maximilian Michels  wrote:
>
> > 1) The default behavior, where PortableRunner starts a flink server. It is 
> > confusing to new users
> It does that only if no JobServer endpoint is specified. AFAIK there a
> problems with the bootstrapping, it can definitely be improved.
>
> > 2) All the related docs and inline comments.  Similarly, it could be very 
> > confusing connecting PortableRunner to Flink server.
> +1 We definitely need to improve docs and usability.
>
> > 3) [Probably no longer an issue].   I couldn't make the flink server 
> > example working.  And I could not make example working on Java-ULR either.
> AFAIK Java URL hasn't received love for a long time.
>
> -Max
>
> On 14.11.18 20:57, Ruoyun Huang wrote:
> > To answer Maximilian's question.
> >
> > I am using Linux, debian distribution.
> >
> > It probably sounded too much when I used the word 'planned merge'. What
> > I really meant entails less change than it sounds. More specifically:
> >
> > 1) The default behavior, where PortableRunner starts a flink server.  It
> > is confusing to new users.
> > 2) All the related docs and inline comments.  Similarly, it could be
> > very confusing connecting PortableRunner to Flink server.
> > 3) [Probably no longer an issue].   I couldn't make the flink server
> > example working.  And I could not make example working on Java-ULR
> > either.  Both will require debugging for resolutions.  Thus I figured
> > maybe let us only focus on one single thing: the java-ULR part, without
> > worrying about Flink-server.   Again, looks like this may not be a valid
> > concern, given flink part is most likely due to my setup.
> >
> >
> > On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels  > > wrote:
> >
> > Hi Ruoyun,
> >
> > I just ran the wordcount locally using the instructions on the page.
> > I've tried the local file system and GCS. Both times it ran
> > successfully
> > and produced valid output.
> >
> > I'm assuming there is some problem with your setup. Which platform are
> > you using? I'm on MacOS.
> >
> > Could you expand on the planned merge? From my understanding we will
> > always need PortableRunner in Python to be able to submit against the
> > Beam JobServer.
> >
> > Thanks,
> > Max
> >
> > On 14.11.18 00:39, Ruoyun Huang wrote:
> >  > A quick follow-up on using current PortableRunner.
> >  >
> >  > I followed the exact three steps as Ankur and Maximilian shared in
> >  > https://beam.apache.org/roadmap/portability/#python-on-flink  ;
> >   The
> >  > wordcount example keeps hanging after 10 minutes.  I also tried
> >  > specifying explicit input/output args, either using gcs folder or
> > local
> >  > file system, but none of them works.
> >  >
> >  > Spent some time looking into it but conclusion yet.  At this point
> >  > though, I guess it does not matter much any more, given we
> > already have
> >  > the plan of merging PortableRunner into using java reference runner
> >  > (i.e. :beam-runners-reference-job-server).
> >  >
> >  > Still appreciated if someone can try out the python-on-flink
> >  >
> > 
> > instructions
> >
> >  > in case it is just due to my local machine setup.  Thanks!
> >  >
> >  >
> >  >
> >  > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang  > 
> >  > >> wrote:
> >  >
> >  > Thanks Maximilian!
> >  >
> >  > I am working on migrating existing PortableRunner to using
> > java ULR
> >  > (Link to Notes
> >  >
> >   
> > ).
> >  > If this issue is non-trivial to solve, I would vote for removing
> >  > this default behavior as part of the consolidation.
> >  >
> >  > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
> > mailto:m...@apache.org>
> >  > >> wrote:
> >  >
> >  > In the long run, we should get rid of the
> > Docker-inside-Docker
> >  > approach,
> >  > which was only intended for testing anyways. It would be
> > cleaner to
> >  > start the SDK harness container alongside with JobServer
> > container.
> >  >
> >  > Short term, I think it should be easy to either fix the
> >  > permissions of
> >  > the mounted "docker" executable or use a Docker image for the
> >  > JobServer
> >  > which 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread Heejong Lee
You can also try without --streaming option. There's a separate streaming
wordcount example in the same directory.

If you want to look into the output files, it would be easier to use
external target like gs:// instead of local file.

python -m apache_beam.examples.wordcount --input=/etc/profile
--output=gs://tmp_location/py-wordcount --runner=PortableRunner
--job_endpoint=localhost:8099 --parallelism=1

On Tue, Jan 22, 2019 at 11:44 AM junwa...@gmail.com 
wrote:

> Hello,
>
> I tried to follow the instructions at
> https://beam.apache.org/roadmap/portability/#python-on-flink,
>
> 1. I installed Flink local cluster, and followed their
> SocketWindowWordCount example and confirmed  the cluster works properly.
>
> 2. Start Flink job server:
> ./gradlew :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081
>
> 3. Subject the job as suggested by an earlier thread:
> python -m apache_beam.examples.wordcount --input=/etc/profile
> --output=/tmp/py-wordcount-direct --runner=PortableRunner
> --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
>
> But got the following NullPointerException error (sorry for the long text
> below), any ideas? Thanks
>
> Jun Wan
>
>  log starts 
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to
> Flink program.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
> Master URL localhost:8081.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread Ankur Goenka
Hi Jun,

This error can be because of different Flink version.
Please make sure that you are using Flink 1.5.6 for the commands you
mentioned.

Thanks,
Ankur

On Tue, Jan 22, 2019 at 11:44 AM junwa...@gmail.com 
wrote:

> Hello,
>
> I tried to follow the instructions at
> https://beam.apache.org/roadmap/portability/#python-on-flink,
>
> 1. I installed Flink local cluster, and followed their
> SocketWindowWordCount example and confirmed  the cluster works properly.
>
> 2. Start Flink job server:
> ./gradlew :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081
>
> 3. Subject the job as suggested by an earlier thread:
> python -m apache_beam.examples.wordcount --input=/etc/profile
> --output=/tmp/py-wordcount-direct --runner=PortableRunner
> --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
>
> But got the following NullPointerException error (sorry for the long text
> below), any ideas? Thanks
>
> Jun Wan
>
>  log starts 
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to
> Flink program.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
> Master URL localhost:8081.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread Ankur Goenka
Hi Jun,

Thanks for reporting the error.
This seems to be because of mismatch in SDKHarness (Docker image) and the
Python SDK. As we are actively developing, this can happen.

Can you please retry after rebuilding the docker images and the Python sdk
from master and install the python sdk to your virtual environment.

Thanks,
Ankur

On Tue, Jan 22, 2019 at 11:44 AM junwa...@gmail.com 
wrote:

> I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further,
> but still fails, here is the latest error from Flink. Thanks!
>
> the job cmd I launched : python -m apache_beam.examples.wordcount
> --input=/etc/profile --output=/tmp/py-wordcount-direct
> --runner=PortableRunner --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
> --experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED
> --experiments=beam_fn_api
>
> Jun
>
>  log starts 
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely
> at localhost:8081
> [flink-runner-job-server] WARN
> org.apache.flink.configuration.Configuration - Config uses deprecated
> configuration key 'jobmanager.rpc.address' instead of proper key
> 'rest.address'
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest client endpoint started.
> [flink-runner-job-server] INFO
> org.apache.flink.client.program.rest.RestClusterClient - Submitting job
> 4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Shutting down rest endpoint.
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest endpoint shutdown complete.
> [flink-runner-job-server] ERROR
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job
> invocation
> BeamApp-jwan-012125-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
> org.apache.flink.client.program.ProgramInvocationException: Job
> 4ecb5e5cfd4718de440f48cbfaf7216a failed.
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
> at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
> at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
> at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
> at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
> at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> ... 12 more
> Caused by: java.lang.RuntimeException: Exception occurred while processing
> valve output watermark:
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> ... 1 more
> Caused by:
> 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread junwan01
I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further, but 
still fails, here is the latest error from Flink. Thanks!

the job cmd I launched : python -m apache_beam.examples.wordcount 
--input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner 
--job_endpoint=localhost:8099 --parallelism=1 
--OPTIONALflink_master=localhost:8081 --streaming 
--experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED 
--experiments=beam_fn_api

Jun
 
 log starts 
[flink-runner-job-server] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at 
localhost:8081
[flink-runner-job-server] WARN org.apache.flink.configuration.Configuration - 
Config uses deprecated configuration key 'jobmanager.rpc.address' instead of 
proper key 'rest.address'
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest 
client endpoint started.
[flink-runner-job-server] INFO 
org.apache.flink.client.program.rest.RestClusterClient - Submitting job 
4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - 
Shutting down rest endpoint.
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest 
endpoint shutdown complete.
[flink-runner-job-server] ERROR 
org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation 
BeamApp-jwan-012125-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
org.apache.flink.client.program.ProgramInvocationException: Job 
4ecb5e5cfd4718de440f48cbfaf7216a failed.
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at 
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
at 
org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
at 
org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
... 12 more
Caused by: java.lang.RuntimeException: Exception occurred while processing 
valve output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
... 1 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:694)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.emitWatermark(DoFnOperator.java:591)
at 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread junwan01
Hello,

I tried to follow the instructions at 
https://beam.apache.org/roadmap/portability/#python-on-flink, 

1. I installed Flink local cluster, and followed their SocketWindowWordCount 
example and confirmed  the cluster works properly.

2. Start Flink job server:
./gradlew :beam-runners-flink_2.11-job-server:runShadow 
-PflinkMasterUrl=localhost:8081

3. Subject the job as suggested by an earlier thread:
python -m apache_beam.examples.wordcount --input=/etc/profile 
--output=/tmp/py-wordcount-direct --runner=PortableRunner 
--job_endpoint=localhost:8099 --parallelism=1 
--OPTIONALflink_master=localhost:8081 --streaming

But got the following NullPointerException error (sorry for the long text 
below), any ideas? Thanks

Jun Wan

 log starts 
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvoker - 
Invoking job 
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvocation 
- Starting job invocation 
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkJobInvocation 
- Translating pipeline to Flink program.
[flink-runner-job-server] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming 
Environment.
[flink-runner-job-server] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master 
URL localhost:8081.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - class 
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for 
field unionTag
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - Class class 
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and 

Re: How to use "PortableRunner" in Python SDK?

2018-11-15 Thread Maximilian Michels

1) The default behavior, where PortableRunner starts a flink server. It is 
confusing to new users
It does that only if no JobServer endpoint is specified. AFAIK there a 
problems with the bootstrapping, it can definitely be improved.



2) All the related docs and inline comments.  Similarly, it could be very 
confusing connecting PortableRunner to Flink server.

+1 We definitely need to improve docs and usability.

3) [Probably no longer an issue].   I couldn't make the flink server example working.  And I could not make example working on Java-ULR either. 

AFAIK Java URL hasn't received love for a long time.

-Max

On 14.11.18 20:57, Ruoyun Huang wrote:

To answer Maximilian's question.

I am using Linux, debian distribution.

It probably sounded too much when I used the word 'planned merge'. What 
I really meant entails less change than it sounds. More specifically:


1) The default behavior, where PortableRunner starts a flink server.  It 
is confusing to new users.
2) All the related docs and inline comments.  Similarly, it could be 
very confusing connecting PortableRunner to Flink server.
3) [Probably no longer an issue].   I couldn't make the flink server 
example working.  And I could not make example working on Java-ULR 
either.  Both will require debugging for resolutions.  Thus I figured 
maybe let us only focus on one single thing: the java-ULR part, without 
worrying about Flink-server.   Again, looks like this may not be a valid 
concern, given flink part is most likely due to my setup.



On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels > wrote:


Hi Ruoyun,

I just ran the wordcount locally using the instructions on the page.
I've tried the local file system and GCS. Both times it ran
successfully
and produced valid output.

I'm assuming there is some problem with your setup. Which platform are
you using? I'm on MacOS.

Could you expand on the planned merge? From my understanding we will
always need PortableRunner in Python to be able to submit against the
Beam JobServer.

Thanks,
Max

On 14.11.18 00:39, Ruoyun Huang wrote:
 > A quick follow-up on using current PortableRunner.
 >
 > I followed the exact three steps as Ankur and Maximilian shared in
 > https://beam.apache.org/roadmap/portability/#python-on-flink  ; 
  The

 > wordcount example keeps hanging after 10 minutes.  I also tried
 > specifying explicit input/output args, either using gcs folder or
local
 > file system, but none of them works.
 >
 > Spent some time looking into it but conclusion yet.  At this point
 > though, I guess it does not matter much any more, given we
already have
 > the plan of merging PortableRunner into using java reference runner
 > (i.e. :beam-runners-reference-job-server).
 >
 > Still appreciated if someone can try out the python-on-flink
 >
instructions

 > in case it is just due to my local machine setup.  Thanks!
 >
 >
 >
 > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang mailto:ruo...@google.com>
 > >> wrote:
 >
 >     Thanks Maximilian!
 >
 >     I am working on migrating existing PortableRunner to using
java ULR
 >     (Link to Notes
 >   
  ).

 >     If this issue is non-trivial to solve, I would vote for removing
 >     this default behavior as part of the consolidation.
 >
 >     On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels
mailto:m...@apache.org>
 >     >> wrote:
 >
 >         In the long run, we should get rid of the
Docker-inside-Docker
 >         approach,
 >         which was only intended for testing anyways. It would be
cleaner to
 >         start the SDK harness container alongside with JobServer
container.
 >
 >         Short term, I think it should be easy to either fix the
 >         permissions of
 >         the mounted "docker" executable or use a Docker image for the
 >         JobServer
 >         which comes with Docker pre-installed.
 >
 >         JIRA: https://issues.apache.org/jira/browse/BEAM-6020
 >
 >         Thanks for reporting this Ruoyun!
 >
 >         -Max
 >
 >         On 08.11.18 00:10, Ruoyun Huang wrote:
 >          > Thanks Ankur and Maximilian.
 >          >
 >          > Just for reference in case other people encountering
the same
 >         error
 >          > message, the "permission denied" error in my original
email
 >         is exactly
 >          > due to dockerinsidedocker issue that Ankur mentioned.
 >         Thanks Ankur!
 >          > 

Re: How to use "PortableRunner" in Python SDK?

2018-11-14 Thread Ruoyun Huang
To answer Maximilian's question.

I am using Linux, debian distribution.

It probably sounded too much when I used the word 'planned merge'. What I
really meant entails less change than it sounds. More specifically:

1) The default behavior, where PortableRunner starts a flink server.  It is
confusing to new users.
2) All the related docs and inline comments.  Similarly, it could be very
confusing connecting PortableRunner to Flink server.
3) [Probably no longer an issue].   I couldn't make the flink server
example working.  And I could not make example working on Java-ULR either.
Both will require debugging for resolutions.  Thus I figured maybe let us
only focus on one single thing: the java-ULR part, without worrying about
Flink-server.   Again, looks like this may not be a valid concern, given
flink part is most likely due to my setup.


On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels  wrote:

> Hi Ruoyun,
>
> I just ran the wordcount locally using the instructions on the page.
> I've tried the local file system and GCS. Both times it ran successfully
> and produced valid output.
>
> I'm assuming there is some problem with your setup. Which platform are
> you using? I'm on MacOS.
>
> Could you expand on the planned merge? From my understanding we will
> always need PortableRunner in Python to be able to submit against the
> Beam JobServer.
>
> Thanks,
> Max
>
> On 14.11.18 00:39, Ruoyun Huang wrote:
> > A quick follow-up on using current PortableRunner.
> >
> > I followed the exact three steps as Ankur and Maximilian shared in
> > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
> > wordcount example keeps hanging after 10 minutes.  I also tried
> > specifying explicit input/output args, either using gcs folder or local
> > file system, but none of them works.
> >
> > Spent some time looking into it but conclusion yet.  At this point
> > though, I guess it does not matter much any more, given we already have
> > the plan of merging PortableRunner into using java reference runner
> > (i.e. :beam-runners-reference-job-server).
> >
> > Still appreciated if someone can try out the python-on-flink
> > instructions
>
> > in case it is just due to my local machine setup.  Thanks!
> >
> >
> >
> > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang  > > wrote:
> >
> > Thanks Maximilian!
> >
> > I am working on migrating existing PortableRunner to using java ULR
> > (Link to Notes
> > <
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> >).
> > If this issue is non-trivial to solve, I would vote for removing
> > this default behavior as part of the consolidation.
> >
> > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels  > > wrote:
> >
> > In the long run, we should get rid of the Docker-inside-Docker
> > approach,
> > which was only intended for testing anyways. It would be cleaner
> to
> > start the SDK harness container alongside with JobServer
> container.
> >
> > Short term, I think it should be easy to either fix the
> > permissions of
> > the mounted "docker" executable or use a Docker image for the
> > JobServer
> > which comes with Docker pre-installed.
> >
> > JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> >
> > Thanks for reporting this Ruoyun!
> >
> > -Max
> >
> > On 08.11.18 00:10, Ruoyun Huang wrote:
> >  > Thanks Ankur and Maximilian.
> >  >
> >  > Just for reference in case other people encountering the same
> > error
> >  > message, the "permission denied" error in my original email
> > is exactly
> >  > due to dockerinsidedocker issue that Ankur mentioned.
> > Thanks Ankur!
> >  > Didn't make the link when you said it, had to discover that
> > in a hard
> >  > way (I thought it is due to my docker installation messed up).
> >  >
> >  > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > mailto:m...@apache.org>
> >  > >> wrote:
> >  >
> >  > Hi,
> >  >
> >  > Please follow
> >  > https://beam.apache.org/roadmap/portability/#python-on-flink
> >  >
> >  > Cheers,
> >  > Max
> >  >
> >  > On 06.11.18 01:14, Ankur Goenka wrote:
> >  >  > Hi,
> >  >  >
> >  >  > The Portable Runner requires a job server uri to work
> > with. The
> >  > current
> >  >  > default job server docker image is broken because of
> > docker inside
> >  >  > docker issue.
> >  >  >
> >  >  > Please refer to
> >  >  >
> > 

Re: How to use "PortableRunner" in Python SDK?

2018-11-14 Thread Ruoyun Huang
Thanks Thomas!

My desktop runs Linux.  I was using gradle to run wordcount, and that was
how I got the job hanging. Since both of you get it working, I guess more
likely sth is wrong with my setup.


By using Thmoas's python command line exactly as is, I am able to see the
job run succeeds, however two questions:

1)  Did you check whether output file "/tmp/py-wordcount-direct" exists or
not?  I expect there should be a text output, but I don't see this file
afterwards.   (I am still in the stage building confidence in telling what
a succeeded run is.  Maybe I will try DataflowRunner and cross check
outputs).

2)  Why it needs a "--streaming" arg?  Isn't this a static batch input, by
feeding a txt file input?  In fact, I got failure message if I remove
'--streaming', not sure if it is due to my setup again.


On Wed, Nov 14, 2018 at 7:51 AM Thomas Weise  wrote:

> Works for me on macOS as well.
>
> In case you don't launch the pipeline through Gradle, this would be the
> command:
>
> python -m apache_beam.examples.wordcount \
>   --input=/etc/profile \
>   --output=/tmp/py-wordcount-direct \
>   --runner=PortableRunner \
>   --job_endpoint=localhost:8099 \
>   --parallelism=1 \
>   --OPTIONALflink_master=localhost:8081 \
>   --streaming
>
> We talked about adding the wordcount to pre-commit..
>
> Regarding using ULR vs. Flink runner: There seems to be confusion between
> PortableRunner using the user supplied endpoint vs. trying to launch a job
> server. I commented in the doc.
>
> Thomas
>
>
>
> On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels  wrote:
>
>> Hi Ruoyun,
>>
>> I just ran the wordcount locally using the instructions on the page.
>> I've tried the local file system and GCS. Both times it ran successfully
>> and produced valid output.
>>
>> I'm assuming there is some problem with your setup. Which platform are
>> you using? I'm on MacOS.
>>
>> Could you expand on the planned merge? From my understanding we will
>> always need PortableRunner in Python to be able to submit against the
>> Beam JobServer.
>>
>> Thanks,
>> Max
>>
>> On 14.11.18 00:39, Ruoyun Huang wrote:
>> > A quick follow-up on using current PortableRunner.
>> >
>> > I followed the exact three steps as Ankur and Maximilian shared in
>> > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
>> > wordcount example keeps hanging after 10 minutes.  I also tried
>> > specifying explicit input/output args, either using gcs folder or local
>> > file system, but none of them works.
>> >
>> > Spent some time looking into it but conclusion yet.  At this point
>> > though, I guess it does not matter much any more, given we already have
>> > the plan of merging PortableRunner into using java reference runner
>> > (i.e. :beam-runners-reference-job-server).
>> >
>> > Still appreciated if someone can try out the python-on-flink
>> > instructions
>>
>> > in case it is just due to my local machine setup.  Thanks!
>> >
>> >
>> >
>> > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang > > > wrote:
>> >
>> > Thanks Maximilian!
>> >
>> > I am working on migrating existing PortableRunner to using java ULR
>> > (Link to Notes
>> > <
>> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
>> >).
>> > If this issue is non-trivial to solve, I would vote for removing
>> > this default behavior as part of the consolidation.
>> >
>> > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels > > > wrote:
>> >
>> > In the long run, we should get rid of the Docker-inside-Docker
>> > approach,
>> > which was only intended for testing anyways. It would be
>> cleaner to
>> > start the SDK harness container alongside with JobServer
>> container.
>> >
>> > Short term, I think it should be easy to either fix the
>> > permissions of
>> > the mounted "docker" executable or use a Docker image for the
>> > JobServer
>> > which comes with Docker pre-installed.
>> >
>> > JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>> >
>> > Thanks for reporting this Ruoyun!
>> >
>> > -Max
>> >
>> > On 08.11.18 00:10, Ruoyun Huang wrote:
>> >  > Thanks Ankur and Maximilian.
>> >  >
>> >  > Just for reference in case other people encountering the same
>> > error
>> >  > message, the "permission denied" error in my original email
>> > is exactly
>> >  > due to dockerinsidedocker issue that Ankur mentioned.
>> > Thanks Ankur!
>> >  > Didn't make the link when you said it, had to discover that
>> > in a hard
>> >  > way (I thought it is due to my docker installation messed
>> up).
>> >  >
>> >  > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
>> > mailto:m...@apache.org>
>> >  > 

Re: How to use "PortableRunner" in Python SDK?

2018-11-14 Thread Thomas Weise
Works for me on macOS as well.

In case you don't launch the pipeline through Gradle, this would be the
command:

python -m apache_beam.examples.wordcount \
  --input=/etc/profile \
  --output=/tmp/py-wordcount-direct \
  --runner=PortableRunner \
  --job_endpoint=localhost:8099 \
  --parallelism=1 \
  --OPTIONALflink_master=localhost:8081 \
  --streaming

We talked about adding the wordcount to pre-commit..

Regarding using ULR vs. Flink runner: There seems to be confusion between
PortableRunner using the user supplied endpoint vs. trying to launch a job
server. I commented in the doc.

Thomas



On Wed, Nov 14, 2018 at 3:30 AM Maximilian Michels  wrote:

> Hi Ruoyun,
>
> I just ran the wordcount locally using the instructions on the page.
> I've tried the local file system and GCS. Both times it ran successfully
> and produced valid output.
>
> I'm assuming there is some problem with your setup. Which platform are
> you using? I'm on MacOS.
>
> Could you expand on the planned merge? From my understanding we will
> always need PortableRunner in Python to be able to submit against the
> Beam JobServer.
>
> Thanks,
> Max
>
> On 14.11.18 00:39, Ruoyun Huang wrote:
> > A quick follow-up on using current PortableRunner.
> >
> > I followed the exact three steps as Ankur and Maximilian shared in
> > https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
> > wordcount example keeps hanging after 10 minutes.  I also tried
> > specifying explicit input/output args, either using gcs folder or local
> > file system, but none of them works.
> >
> > Spent some time looking into it but conclusion yet.  At this point
> > though, I guess it does not matter much any more, given we already have
> > the plan of merging PortableRunner into using java reference runner
> > (i.e. :beam-runners-reference-job-server).
> >
> > Still appreciated if someone can try out the python-on-flink
> > instructions
>
> > in case it is just due to my local machine setup.  Thanks!
> >
> >
> >
> > On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang  > > wrote:
> >
> > Thanks Maximilian!
> >
> > I am working on migrating existing PortableRunner to using java ULR
> > (Link to Notes
> > <
> https://docs.google.com/document/d/1S86saZqiDaE_M5wxO0zOQ_rwC6QHv7sp1BmGTm0dLNE/edit#
> >).
> > If this issue is non-trivial to solve, I would vote for removing
> > this default behavior as part of the consolidation.
> >
> > On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels  > > wrote:
> >
> > In the long run, we should get rid of the Docker-inside-Docker
> > approach,
> > which was only intended for testing anyways. It would be cleaner
> to
> > start the SDK harness container alongside with JobServer
> container.
> >
> > Short term, I think it should be easy to either fix the
> > permissions of
> > the mounted "docker" executable or use a Docker image for the
> > JobServer
> > which comes with Docker pre-installed.
> >
> > JIRA: https://issues.apache.org/jira/browse/BEAM-6020
> >
> > Thanks for reporting this Ruoyun!
> >
> > -Max
> >
> > On 08.11.18 00:10, Ruoyun Huang wrote:
> >  > Thanks Ankur and Maximilian.
> >  >
> >  > Just for reference in case other people encountering the same
> > error
> >  > message, the "permission denied" error in my original email
> > is exactly
> >  > due to dockerinsidedocker issue that Ankur mentioned.
> > Thanks Ankur!
> >  > Didn't make the link when you said it, had to discover that
> > in a hard
> >  > way (I thought it is due to my docker installation messed up).
> >  >
> >  > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
> > mailto:m...@apache.org>
> >  > >> wrote:
> >  >
> >  > Hi,
> >  >
> >  > Please follow
> >  > https://beam.apache.org/roadmap/portability/#python-on-flink
> >  >
> >  > Cheers,
> >  > Max
> >  >
> >  > On 06.11.18 01:14, Ankur Goenka wrote:
> >  >  > Hi,
> >  >  >
> >  >  > The Portable Runner requires a job server uri to work
> > with. The
> >  > current
> >  >  > default job server docker image is broken because of
> > docker inside
> >  >  > docker issue.
> >  >  >
> >  >  > Please refer to
> >  >  >
> > https://beam.apache.org/roadmap/portability/#python-on-flink for
> >  > how to
> >  >  > run a wordcount using Portable Flink Runner.
> >  >  >
> >  >  > Thanks,
> >  >  > Ankur
> >  > 

Re: How to use "PortableRunner" in Python SDK?

2018-11-14 Thread Maximilian Michels

Hi Ruoyun,

I just ran the wordcount locally using the instructions on the page. 
I've tried the local file system and GCS. Both times it ran successfully 
and produced valid output.


I'm assuming there is some problem with your setup. Which platform are 
you using? I'm on MacOS.


Could you expand on the planned merge? From my understanding we will 
always need PortableRunner in Python to be able to submit against the 
Beam JobServer.


Thanks,
Max

On 14.11.18 00:39, Ruoyun Huang wrote:

A quick follow-up on using current PortableRunner.

I followed the exact three steps as Ankur and Maximilian shared in 
https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The 
wordcount example keeps hanging after 10 minutes.  I also tried 
specifying explicit input/output args, either using gcs folder or local 
file system, but none of them works.


Spent some time looking into it but conclusion yet.  At this point 
though, I guess it does not matter much any more, given we already have 
the plan of merging PortableRunner into using java reference runner 
(i.e. :beam-runners-reference-job-server).


Still appreciated if someone can try out the python-on-flink 
instructions 
in case it is just due to my local machine setup.  Thanks!




On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang > wrote:


Thanks Maximilian!

I am working on migrating existing PortableRunner to using java ULR
(Link to Notes

).
If this issue is non-trivial to solve, I would vote for removing
this default behavior as part of the consolidation.

On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels mailto:m...@apache.org>> wrote:

In the long run, we should get rid of the Docker-inside-Docker
approach,
which was only intended for testing anyways. It would be cleaner to
start the SDK harness container alongside with JobServer container.

Short term, I think it should be easy to either fix the
permissions of
the mounted "docker" executable or use a Docker image for the
JobServer
which comes with Docker pre-installed.

JIRA: https://issues.apache.org/jira/browse/BEAM-6020

Thanks for reporting this Ruoyun!

-Max

On 08.11.18 00:10, Ruoyun Huang wrote:
 > Thanks Ankur and Maximilian.
 >
 > Just for reference in case other people encountering the same
error
 > message, the "permission denied" error in my original email
is exactly
 > due to dockerinsidedocker issue that Ankur mentioned. 
Thanks Ankur!

 > Didn't make the link when you said it, had to discover that
in a hard
 > way (I thought it is due to my docker installation messed up).
 >
 > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >     Hi,
 >
 >     Please follow
 > https://beam.apache.org/roadmap/portability/#python-on-flink
 >
 >     Cheers,
 >     Max
 >
 >     On 06.11.18 01:14, Ankur Goenka wrote:
 >      > Hi,
 >      >
 >      > The Portable Runner requires a job server uri to work
with. The
 >     current
 >      > default job server docker image is broken because of
docker inside
 >      > docker issue.
 >      >
 >      > Please refer to
 >      >
https://beam.apache.org/roadmap/portability/#python-on-flink for
 >     how to
 >      > run a wordcount using Portable Flink Runner.
 >      >
 >      > Thanks,
 >      > Ankur
 >      >
 >      > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang
mailto:ruo...@google.com>
 >     >
 >      > 
      >
 >      >     Hi, Folks,
 >      >
 >      >           I want to try out Python PortableRunner, by
using following
 >      >     command:
 >      >
 >      >     *sdk/python: python -m apache_beam.examples.wordcount
 >      >       --output=/tmp/test_output   --runner PortableRunner*
 >      >
 >      >           It complains with following error message:
 >      >
 >      >     Caused by: java.lang.Exception: The user defined
'open()' method
 >      >     caused an exception: java.io.IOException: Cannot
run program
 >      >     "docker": error=13, Permission denied
  

Re: How to use "PortableRunner" in Python SDK?

2018-11-13 Thread Ruoyun Huang
A quick follow-up on using current PortableRunner.

I followed the exact three steps as Ankur and Maximilian shared in
https://beam.apache.org/roadmap/portability/#python-on-flink  ;   The
wordcount example keeps hanging after 10 minutes.  I also tried specifying
explicit input/output args, either using gcs folder or local file system,
but none of them works.

Spent some time looking into it but conclusion yet.  At this point though,
I guess it does not matter much any more, given we already have the plan of
merging PortableRunner into using java reference runner (i.e.
:beam-runners-reference-job-server).

Still appreciated if someone can try out the python-on-flink
instructions
in case it is just due to my local machine setup.  Thanks!



On Thu, Nov 8, 2018 at 5:04 PM Ruoyun Huang  wrote:

> Thanks Maximilian!
>
> I am working on migrating existing PortableRunner to using java ULR (Link
> to Notes
> ).
> If this issue is non-trivial to solve, I would vote for removing this
> default behavior as part of the consolidation.
>
> On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels  wrote:
>
>> In the long run, we should get rid of the Docker-inside-Docker approach,
>> which was only intended for testing anyways. It would be cleaner to
>> start the SDK harness container alongside with JobServer container.
>>
>> Short term, I think it should be easy to either fix the permissions of
>> the mounted "docker" executable or use a Docker image for the JobServer
>> which comes with Docker pre-installed.
>>
>> JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>>
>> Thanks for reporting this Ruoyun!
>>
>> -Max
>>
>> On 08.11.18 00:10, Ruoyun Huang wrote:
>> > Thanks Ankur and Maximilian.
>> >
>> > Just for reference in case other people encountering the same error
>> > message, the "permission denied" error in my original email is exactly
>> > due to dockerinsidedocker issue that Ankur mentioned.  Thanks
>> Ankur!
>> > Didn't make the link when you said it, had to discover that in a hard
>> > way (I thought it is due to my docker installation messed up).
>> >
>> > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels > > > wrote:
>> >
>> > Hi,
>> >
>> > Please follow
>> > https://beam.apache.org/roadmap/portability/#python-on-flink
>> >
>> > Cheers,
>> > Max
>> >
>> > On 06.11.18 01:14, Ankur Goenka wrote:
>> >  > Hi,
>> >  >
>> >  > The Portable Runner requires a job server uri to work with. The
>> > current
>> >  > default job server docker image is broken because of docker
>> inside
>> >  > docker issue.
>> >  >
>> >  > Please refer to
>> >  > https://beam.apache.org/roadmap/portability/#python-on-flink for
>> > how to
>> >  > run a wordcount using Portable Flink Runner.
>> >  >
>> >  > Thanks,
>> >  > Ankur
>> >  >
>> >  > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang > > 
>> >  > >> wrote:
>> >  >
>> >  > Hi, Folks,
>> >  >
>> >  >   I want to try out Python PortableRunner, by using
>> following
>> >  > command:
>> >  >
>> >  > *sdk/python: python -m apache_beam.examples.wordcount
>> >  >   --output=/tmp/test_output   --runner PortableRunner*
>> >  >
>> >  >   It complains with following error message:
>> >  >
>> >  > Caused by: java.lang.Exception: The user defined 'open()'
>> method
>> >  > caused an exception: java.io.IOException: Cannot run program
>> >  > "docker": error=13, Permission denied
>> >  > at
>> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
>> >  > at
>> >  >
>> >
>>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>> >  > at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>> >  > ... 1 more
>> >  > Caused by:
>> >  >
>> >
>>  
>> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
>> >  > java.io.IOException: Cannot run program "docker": error=13,
>> >  > Permission denied
>> >  > at
>> >  >
>> >
>>  
>> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>> >  >
>> >  > ... 7 more
>> >  >
>> >  >
>> >  >
>> >  > My py2 environment is properly configured, because
>> DirectRunner
>> >  > works.  Also I tested my docker installation by 'docker run
>> >  > hello-world ', no issue.
>> >  >
>> >  >
>> >  > Thanks.
>> >  > --
>> >  > 
>> >  > Ruoyun  Huang
>> > 

Re: How to use "PortableRunner" in Python SDK?

2018-11-08 Thread Ruoyun Huang
Thanks Maximilian!

I am working on migrating existing PortableRunner to using java ULR (Link
to Notes
).
If this issue is non-trivial to solve, I would vote for removing this
default behavior as part of the consolidation.

On Thu, Nov 8, 2018 at 2:58 AM Maximilian Michels  wrote:

> In the long run, we should get rid of the Docker-inside-Docker approach,
> which was only intended for testing anyways. It would be cleaner to
> start the SDK harness container alongside with JobServer container.
>
> Short term, I think it should be easy to either fix the permissions of
> the mounted "docker" executable or use a Docker image for the JobServer
> which comes with Docker pre-installed.
>
> JIRA: https://issues.apache.org/jira/browse/BEAM-6020
>
> Thanks for reporting this Ruoyun!
>
> -Max
>
> On 08.11.18 00:10, Ruoyun Huang wrote:
> > Thanks Ankur and Maximilian.
> >
> > Just for reference in case other people encountering the same error
> > message, the "permission denied" error in my original email is exactly
> > due to dockerinsidedocker issue that Ankur mentioned.  Thanks Ankur!
> > Didn't make the link when you said it, had to discover that in a hard
> > way (I thought it is due to my docker installation messed up).
> >
> > On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels  > > wrote:
> >
> > Hi,
> >
> > Please follow
> > https://beam.apache.org/roadmap/portability/#python-on-flink
> >
> > Cheers,
> > Max
> >
> > On 06.11.18 01:14, Ankur Goenka wrote:
> >  > Hi,
> >  >
> >  > The Portable Runner requires a job server uri to work with. The
> > current
> >  > default job server docker image is broken because of docker inside
> >  > docker issue.
> >  >
> >  > Please refer to
> >  > https://beam.apache.org/roadmap/portability/#python-on-flink for
> > how to
> >  > run a wordcount using Portable Flink Runner.
> >  >
> >  > Thanks,
> >  > Ankur
> >  >
> >  > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang  > 
> >  > >> wrote:
> >  >
> >  > Hi, Folks,
> >  >
> >  >   I want to try out Python PortableRunner, by using
> following
> >  > command:
> >  >
> >  > *sdk/python: python -m apache_beam.examples.wordcount
> >  >   --output=/tmp/test_output   --runner PortableRunner*
> >  >
> >  >   It complains with following error message:
> >  >
> >  > Caused by: java.lang.Exception: The user defined 'open()'
> method
> >  > caused an exception: java.io.IOException: Cannot run program
> >  > "docker": error=13, Permission denied
> >  > at
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> >  > at
> >  >
> >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >  > at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >  > ... 1 more
> >  > Caused by:
> >  >
> >
>  
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> >  > java.io.IOException: Cannot run program "docker": error=13,
> >  > Permission denied
> >  > at
> >  >
> >
>  
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >  >
> >  > ... 7 more
> >  >
> >  >
> >  >
> >  > My py2 environment is properly configured, because
> DirectRunner
> >  > works.  Also I tested my docker installation by 'docker run
> >  > hello-world ', no issue.
> >  >
> >  >
> >  > Thanks.
> >  > --
> >  > 
> >  > Ruoyun  Huang
> >  >
> >
> >
> >
> > --
> > 
> > Ruoyun  Huang
> >
>


-- 

Ruoyun  Huang


Re: How to use "PortableRunner" in Python SDK?

2018-11-08 Thread Maximilian Michels
In the long run, we should get rid of the Docker-inside-Docker approach, 
which was only intended for testing anyways. It would be cleaner to 
start the SDK harness container alongside with JobServer container.


Short term, I think it should be easy to either fix the permissions of 
the mounted "docker" executable or use a Docker image for the JobServer 
which comes with Docker pre-installed.


JIRA: https://issues.apache.org/jira/browse/BEAM-6020

Thanks for reporting this Ruoyun!

-Max

On 08.11.18 00:10, Ruoyun Huang wrote:

Thanks Ankur and Maximilian.

Just for reference in case other people encountering the same error 
message, the "permission denied" error in my original email is exactly 
due to dockerinsidedocker issue that Ankur mentioned.      Thanks Ankur! 
Didn't make the link when you said it, had to discover that in a hard 
way (I thought it is due to my docker installation messed up).


On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels > wrote:


Hi,

Please follow
https://beam.apache.org/roadmap/portability/#python-on-flink

Cheers,
Max

On 06.11.18 01:14, Ankur Goenka wrote:
 > Hi,
 >
 > The Portable Runner requires a job server uri to work with. The
current
 > default job server docker image is broken because of docker inside
 > docker issue.
 >
 > Please refer to
 > https://beam.apache.org/roadmap/portability/#python-on-flink for
how to
 > run a wordcount using Portable Flink Runner.
 >
 > Thanks,
 > Ankur
 >
 > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang mailto:ruo...@google.com>
 > >> wrote:
 >
 >     Hi, Folks,
 >
 >           I want to try out Python PortableRunner, by using following
 >     command:
 >
 >     *sdk/python: python -m apache_beam.examples.wordcount
 >       --output=/tmp/test_output   --runner PortableRunner*
 >
 >           It complains with following error message:
 >
 >     Caused by: java.lang.Exception: The user defined 'open()' method
 >     caused an exception: java.io.IOException: Cannot run program
 >     "docker": error=13, Permission denied
 >     at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
 >     at
 >   
  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

 >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
 >     ... 1 more
 >     Caused by:
 >   
  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:

 >     java.io.IOException: Cannot run program "docker": error=13,
 >     Permission denied
 >     at
 >   
  org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)

 >
 >     ... 7 more
 >
 >
 >
 >     My py2 environment is properly configured, because DirectRunner
 >     works.  Also I tested my docker installation by 'docker run
 >     hello-world ', no issue.
 >
 >
 >     Thanks.
 >     --
 >     
 >     Ruoyun  Huang
 >



--

Ruoyun  Huang



Re: How to use "PortableRunner" in Python SDK?

2018-11-07 Thread Ruoyun Huang
Thanks Ankur and Maximilian.

Just for reference in case other people encountering the same error
message, the "permission denied" error in my original email is exactly due
to docker inside docker issue that Ankur mentioned.  Thanks Ankur!
Didn't make the link when you said it, had to discover that in a hard way
(I thought it is due to my docker installation messed up).

On Tue, Nov 6, 2018 at 1:53 AM Maximilian Michels  wrote:

> Hi,
>
> Please follow https://beam.apache.org/roadmap/portability/#python-on-flink
>
> Cheers,
> Max
>
> On 06.11.18 01:14, Ankur Goenka wrote:
> > Hi,
> >
> > The Portable Runner requires a job server uri to work with. The current
> > default job server docker image is broken because of docker inside
> > docker issue.
> >
> > Please refer to
> > https://beam.apache.org/roadmap/portability/#python-on-flink for how to
> > run a wordcount using Portable Flink Runner.
> >
> > Thanks,
> > Ankur
> >
> > On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang  > > wrote:
> >
> > Hi, Folks,
> >
> >   I want to try out Python PortableRunner, by using following
> > command:
> >
> > *sdk/python: python -m apache_beam.examples.wordcount
> >   --output=/tmp/test_output   --runner PortableRunner*
> >
> >   It complains with following error message:
> >
> > Caused by: java.lang.Exception: The user defined 'open()' method
> > caused an exception: java.io.IOException: Cannot run program
> > "docker": error=13, Permission denied
> > at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > at
> >
>  org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> > ... 1 more
> > Caused by:
> >
>  
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> > java.io.IOException: Cannot run program "docker": error=13,
> > Permission denied
> > at
> >
>  
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
> >
> > ... 7 more
> >
> >
> >
> > My py2 environment is properly configured, because DirectRunner
> > works.  Also I tested my docker installation by 'docker run
> > hello-world ', no issue.
> >
> >
> > Thanks.
> > --
> > 
> > Ruoyun  Huang
> >
>


-- 

Ruoyun  Huang


Re: How to use "PortableRunner" in Python SDK?

2018-11-06 Thread Maximilian Michels

Hi,

Please follow https://beam.apache.org/roadmap/portability/#python-on-flink

Cheers,
Max

On 06.11.18 01:14, Ankur Goenka wrote:

Hi,

The Portable Runner requires a job server uri to work with. The current 
default job server docker image is broken because of docker inside 
docker issue.


Please refer to 
https://beam.apache.org/roadmap/portability/#python-on-flink for how to 
run a wordcount using Portable Flink Runner.


Thanks,
Ankur

On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang > wrote:


Hi, Folks,

      I want to try out Python PortableRunner, by using following
command:

*sdk/python: python -m apache_beam.examples.wordcount 
  --output=/tmp/test_output   --runner PortableRunner*


      It complains with following error message:

Caused by: java.lang.Exception: The user defined 'open()' method
caused an exception: java.io.IOException: Cannot run program
"docker": error=13, Permission denied
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
... 1 more
Caused by:

org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program "docker": error=13,
Permission denied
at

org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)

... 7 more



My py2 environment is properly configured, because DirectRunner
works.  Also I tested my docker installation by 'docker run
hello-world ', no issue.


Thanks.
-- 


Ruoyun  Huang



Re: How to use "PortableRunner" in Python SDK?

2018-11-05 Thread Ankur Goenka
Hi,

The Portable Runner requires a job server uri to work with. The current
default job server docker image is broken because of docker inside docker
issue.

Please refer to https://beam.apache.org/roadmap/portability/#python-on-flink
for how to run a wordcount using Portable Flink Runner.

Thanks,
Ankur

On Mon, Nov 5, 2018 at 3:41 PM Ruoyun Huang  wrote:

> Hi, Folks,
>
>  I want to try out Python PortableRunner, by using following command:
>
> *sdk/python: python -m apache_beam.examples.wordcount
>  --output=/tmp/test_output   --runner PortableRunner*
>
>  It complains with following error message:
>
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: java.io.IOException: Cannot run program "docker": error=13,
> Permission denied
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> ... 1 more
> Caused by:
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=13, Permission
> denied
> at
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4994)
>
> ... 7 more
>
>
>
> My py2 environment is properly configured, because DirectRunner works.
> Also I tested my docker installation by 'docker run hello-world ', no
> issue.
>
>
> Thanks.
> --
> 
> Ruoyun  Huang
>
>