Re: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Juan Carlos Garcia
If you want a single pipeline, as Robert mentioned you need an Streaming
pipeline which requires an unbounded source (like kafka, or gcp pub/sub).
In your example your are creating your source from a fixed list which on
definition is a bounded source.

JC

Robert Bradshaw  schrieb am Mo., 16. Sep. 2019, 21:38:

> An external scheduler would also create a new job every time. The only
> way I see to continuously process results in a single job is to have a
> streaming job.
>
> On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi
>  wrote:
> >
> > Hi Juan,
> >
> > Thanks for the reply ! I want to know if there is any way in dataflow to
> achieve this before trying external scheduler.
> >
> > Regards,
> > Anjana
> > 
> > From: Juan Carlos Garcia [jcgarc...@gmail.com]
> > Sent: Monday, September 16, 2019 11:23 AM
> > To: user@beam.apache.org
> > Cc: Richard Amrith Lourdu
> > Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop)
> >
> > Hi Anjana,
> >
> > You need to separate your line of thoughts between the pipeline
> definition vs what happens when you call *run* on the pipeline, given that
> you need externalize the scheduling using something like a crontab,
> jenkins, or another mechanism.
> >
> > Best regards,
> > JC
> >
> > On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi 
> wrote:
> >>
> >> Hi,
> >>
> >> I am trying to run a task using an infinite while loop with change in
> input parameters as below but it creates a new job for every time. (I
> expect it to use same pipeline that is created before while loop and should
> be in a single job)
> >>
> >> p = beam.Pipeline(options=PipelineOptions())
> >>
> >> while True:
> >> to_date = time.time()
> >>
> >> (p
> >> | 'create surveys' >> beam.Create(id_list)
> >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
> from_date, to_date))
> >>| 'send to output' >> beam.FlatMap(lambda input:
> (send_to_api(input)))
> >>)
> >> from_date = to_date + timedelta(microseconds=1)
> >> time.sleep(30)
> >>
> >> p.run().wait_until_finish()
> >>
> >> It works properly(in only one job) when there is no while loop as below:
> >>
> >> p = beam.Pipeline(options=PipelineOptions())
> >>
> >> (p
> >> | 'create surveys' >> beam.Create(id_list)
> >>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
> from_date, to_date))
> >>| 'send to output' >> beam.FlatMap(lambda input:
> (send_to_api(input)))
> >>)
> >>
> >> p.run().wait_until_finish()
> >>
> >> Could someone please suggest how to make the task run in same job
> instead of creating multiple jobs.
> >>
> >> Please let me know in case if any additional information needed.
> >>
> >> Thanks,
> >> Anjana
> >>
> >>
> >>
> ---
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
> >
> >
> >
> > --
> >
> > JC
> >
> >
> ---
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you are not the intended recipient, please notify us
> immediately by responding to this email and then delete it from your
> system. Bahwan Cybertek is neither liable for the proper and complete
> transmission of the information contained in this communication nor for any
> delay in its receipt.
>


Re: How to use the loopback?

2019-09-16 Thread Benjamin Tan
Here you go! 

builder@dnnserver2:~/beam (release-2.16.0) $ ./gradlew 
:runners:spark:job-server:runShadow -PsparkMasterUrl=spark://dnnserver2:7077
Configuration on demand is an incubating feature.

> Task :runners:spark:job-server:runShadow
Listening for transport dt_socket at address: 5005
log4j:WARN No appenders could be found for logger 
(org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/09/17 12:57:06 INFO SparkContext: Running Spark version 2.4.4
19/09/17 12:57:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
19/09/17 12:57:06 INFO SparkContext: Submitted application: 
BeamApp-builder-0917045705-aa3391a3_96f998c3-3c0d-42f1-885c-f1e2538310bc
19/09/17 12:57:06 INFO SecurityManager: Changing view acls to: builder
19/09/17 12:57:06 INFO SecurityManager: Changing modify acls to: builder
19/09/17 12:57:06 INFO SecurityManager: Changing view acls groups to:
19/09/17 12:57:06 INFO SecurityManager: Changing modify acls groups to:
19/09/17 12:57:06 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(builder); groups 
with view permissions: Set(); users  with modify permissions: Set(builder); 
groups with modify permissions: Set()
19/09/17 12:57:07 INFO Utils: Successfully started service 'sparkDriver' on 
port 36069.
19/09/17 12:57:07 INFO SparkEnv: Registering MapOutputTracker
19/09/17 12:57:07 INFO SparkEnv: Registering BlockManagerMaster
19/09/17 12:57:07 INFO BlockManagerMasterEndpoint: Using 
org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/09/17 12:57:07 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/09/17 12:57:07 INFO DiskBlockManager: Created local directory at 
/tmp/blockmgr-92f6079e-4a85-4b09-b48b-5d58ddf304a6
19/09/17 12:57:07 INFO MemoryStore: MemoryStore started with capacity 1949.1 MB
19/09/17 12:57:07 INFO SparkEnv: Registering OutputCommitCoordinator
19/09/17 12:57:07 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
19/09/17 12:57:07 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
http://dnnserver2:4040
19/09/17 12:57:07 INFO SparkContext: Added JAR 
/home/builder/beam/runners/spark/job-server/build/install/job-server-shadow/lib/beam-runners-spark-job-server-2.16.0-SNAPSHOT.jar
 at 
spark://dnnserver2:36069/jars/beam-runners-spark-job-server-2.16.0-SNAPSHOT.jar 
with timestamp 1568696227623
19/09/17 12:57:07 INFO StandaloneAppClient$ClientEndpoint: Connecting to master 
spark://dnnserver2:7077...
19/09/17 12:57:07 INFO TransportClientFactory: Successfully created connection 
to dnnserver2/10.64.1.208:7077 after 40 ms (0 ms spent in bootstraps)
19/09/17 12:57:07 INFO StandaloneSchedulerBackend: Connected to Spark cluster 
with app ID app-20190917125707-0066
19/09/17 12:57:07 INFO StandaloneAppClient$ClientEndpoint: Executor added: 
app-20190917125707-0066/0 on worker-20190916143324-10.64.1.208-41823 
(10.64.1.208:41823) with 12 core(s)
19/09/17 12:57:07 INFO StandaloneSchedulerBackend: Granted executor ID 
app-20190917125707-0066/0 on hostPort 10.64.1.208:41823 with 12 core(s), 1024.0 
MB RAM
19/09/17 12:57:07 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 37069.
19/09/17 12:57:07 INFO NettyBlockTransferService: Server created on 
dnnserver2:37069
19/09/17 12:57:07 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
19/09/17 12:57:07 INFO StandaloneAppClient$ClientEndpoint: Executor updated: 
app-20190917125707-0066/0 is now RUNNING
19/09/17 12:57:07 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, dnnserver2, 37069, None)
19/09/17 12:57:07 INFO BlockManagerMasterEndpoint: Registering block manager 
dnnserver2:37069 with 1949.1 MB RAM, BlockManagerId(driver, dnnserver2, 37069, 
None)
19/09/17 12:57:07 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, dnnserver2, 37069, None)
19/09/17 12:57:07 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, dnnserver2, 37069, None)
19/09/17 12:57:07 INFO StandaloneSchedulerBackend: SchedulerBackend is ready 
for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
19/09/17 12:57:07 INFO SparkPipelineRunner: Running job 
BeamApp-builder-0917045705-aa3391a3_96f998c3-3c0d-42f1-885c-f1e2538310bc on 
Spark master spark://dnnserver2:7077
19/09/17 12:57:07 INFO AggregatorsAccumulator: Instantiated aggregators 
accumulator:
19/09/17 12:57:08 INFO MetricsAccumulator: Instantiated metrics accumulator: 
MetricQueryResults()
19/09/17 12:57:08 WARN GroupNonMergingWindowsFunctions: Either coder 

Re: How to use the loopback?

2019-09-16 Thread Kyle Weaver
Could you share more of the stack trace?

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Mon, Sep 16, 2019 at 7:49 PM Benjamin Tan 
wrote:

> I'm trying to use the loopback via the environment_type option:
>
> options = PipelineOptions(["--runner=PortableRunner",
>
>  "--environment_config=-apachebeam/python3.7_sdk ",
>"--environment_type=LOOPBACK",
>
>  "--job_endpoint=dnnserver2:8099"])
>
> Previouly, I've done:
>
> ./gradlew -p sdks/python/container buildAll
>
> And ran the Spark job server:
>
> ./gradlew :runners:spark:job-server:runShadow
> -PsparkMasterUrl=spark://dnnserver2:7077
>
> However, I get a pretty cryptic error message:
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method not found!
>
> Any ideas?
>
>
>


How to use the loopback?

2019-09-16 Thread Benjamin Tan
I'm trying to use the loopback via the environment_type option:

options = PipelineOptions(["--runner=PortableRunner",
   
"--environment_config=-apachebeam/python3.7_sdk ",
   "--environment_type=LOOPBACK",
   "--job_endpoint=dnnserver2:8099"])

Previouly, I've done:

./gradlew -p sdks/python/container buildAll

And ran the Spark job server:

./gradlew :runners:spark:job-server:runShadow 
-PsparkMasterUrl=spark://dnnserver2:7077

However, I get a pretty cryptic error message:

org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: 
UNIMPLEMENTED: Method not found!

Any ideas?




Re: How to use google container registry for FlinkRunner ?

2019-09-16 Thread Benjamin Tan
Something like this:

options = PipelineOptions(["--runner=PortableRunner",
   "--environment_config=apachebeam/python3.7_sdk ", # 
<---
   "--job_endpoint=dnnserver2:8099"])

On 2019/09/17 02:14:00, Kyle Weaver  wrote: 
> I think environment_config is the option you are looking for.
> 
> On Mon, Sep 16, 2019 at 7:06 PM Yu Watanabe  wrote:
> 
> > Hello.
> >
> > For testing I would like to use image uploaded to google container
> > registry.
> > How can I use images pulled from other than bintray.io ?
> >
> >
> > 
> > admin@ip-172-31-9-89:~$ gcloud container images list --repository
> > asia.gcr.io/[PROJECTNAME] 
> > NAME
> > asia.gcr.io/[PROJECTNAME]/beam 
> >
> > 
> >
> > Looks like FlinkRunner (2.15.0)  uses bintray repository as a default
> > behavior. As a result I am not able to use
> >
> > Caused by: java.lang.Exception: The user defined 'open()' method caused an
> > exception: java.io.IOException: Received exit code 125 for command 'docker
> > run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm
> > ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1
> > --logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681
> > --provision_endpoint=localhost:45089 --control_endpoint=localhost:37045'.
> > stderr: Unable to find image '
> > ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
> > Error response from daemon: manifest for
> > ywatanabe-docker-apache.bintray.io/beam/python3:latest not found:
> > manifest unknown: The named manifest is not known to the registry.See
> > 'docker run --
> >
> > According to online doc, there is not parameter which controls which image
> > to use .
> >
> > https://beam.apache.org/documentation/runners/flink/
> >
> > Pipeline options I am using is below.
> >
> >
> > 
> > options = PipelineOptions([
> >   "--runner=FlinkRunner",
> >   "--flink_version=1.8",
> >   "--flink_master_url=localhost:8081",
> >   "--worker_harness_container_image=
> > asia.gcr.io/PROJECTNAME/beam/python3",
> >   "--experiments=beam_fn_api"
> >   ])
> >
> > 
> >
> > Perhaps is there any  environment variable to specify which image to use ?
> >
> > Best Regards,
> > Yu Watanabe
> >
> > --
> > Yu Watanabe
> > Weekend Freelancer who loves to challenge building data platform
> > yu.w.ten...@gmail.com
> > [image: LinkedIn icon]   [image:
> > Twitter icon] 
> >
> -- 
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> 


Re: How to use google container registry for FlinkRunner ?

2019-09-16 Thread Kyle Weaver
I think environment_config is the option you are looking for.

On Mon, Sep 16, 2019 at 7:06 PM Yu Watanabe  wrote:

> Hello.
>
> For testing I would like to use image uploaded to google container
> registry.
> How can I use images pulled from other than bintray.io ?
>
>
> 
> admin@ip-172-31-9-89:~$ gcloud container images list --repository
> asia.gcr.io/[PROJECTNAME] 
> NAME
> asia.gcr.io/[PROJECTNAME]/beam 
>
> 
>
> Looks like FlinkRunner (2.15.0)  uses bintray repository as a default
> behavior. As a result I am not able to use
>
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: java.io.IOException: Received exit code 125 for command 'docker
> run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm
> ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1
> --logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681
> --provision_endpoint=localhost:45089 --control_endpoint=localhost:37045'.
> stderr: Unable to find image '
> ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
> Error response from daemon: manifest for
> ywatanabe-docker-apache.bintray.io/beam/python3:latest not found:
> manifest unknown: The named manifest is not known to the registry.See
> 'docker run --
>
> According to online doc, there is not parameter which controls which image
> to use .
>
> https://beam.apache.org/documentation/runners/flink/
>
> Pipeline options I am using is below.
>
>
> 
> options = PipelineOptions([
>   "--runner=FlinkRunner",
>   "--flink_version=1.8",
>   "--flink_master_url=localhost:8081",
>   "--worker_harness_container_image=
> asia.gcr.io/PROJECTNAME/beam/python3",
>   "--experiments=beam_fn_api"
>   ])
>
> 
>
> Perhaps is there any  environment variable to specify which image to use ?
>
> Best Regards,
> Yu Watanabe
>
> --
> Yu Watanabe
> Weekend Freelancer who loves to challenge building data platform
> yu.w.ten...@gmail.com
> [image: LinkedIn icon]   [image:
> Twitter icon] 
>
-- 
Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


How to use google container registry for FlinkRunner ?

2019-09-16 Thread Yu Watanabe
Hello.

For testing I would like to use image uploaded to google container registry.
How can I use images pulled from other than bintray.io ?


admin@ip-172-31-9-89:~$ gcloud container images list --repository
asia.gcr.io/[PROJECTNAME] 
NAME
asia.gcr.io/[PROJECTNAME]/beam 


Looks like FlinkRunner (2.15.0)  uses bintray repository as a default
behavior. As a result I am not able to use

Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: java.io.IOException: Received exit code 125 for command 'docker
run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm
ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1
--logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681
--provision_endpoint=localhost:45089 --control_endpoint=localhost:37045'.
stderr: Unable to find image '
ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
Error response from daemon: manifest for
ywatanabe-docker-apache.bintray.io/beam/python3:latest not found: manifest
unknown: The named manifest is not known to the registry.See 'docker run --

According to online doc, there is not parameter which controls which image
to use .

https://beam.apache.org/documentation/runners/flink/

Pipeline options I am using is below.



options = PipelineOptions([
  "--runner=FlinkRunner",
  "--flink_version=1.8",
  "--flink_master_url=localhost:8081",
  "--worker_harness_container_image=
asia.gcr.io/PROJECTNAME/beam/python3",
  "--experiments=beam_fn_api"
  ])



Perhaps is there any  environment variable to specify which image to use ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon]   [image:
Twitter icon] 


Re: The state of external transforms in Beam

2019-09-16 Thread Chamikara Jayalath
Thanks for the nice write up Chad.

On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw 
wrote:

> Thanks for bringing this up again. My thoughts on the open questions below.
>
> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> > That commit solves 2 problems:
> >
> > Adds the pubsub Java deps so that they’re available in our portable
> pipeline
> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
> available as a standard coder. This is required because both PubsubIO.Read
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
> objects, but only “standard” (i.e. portable) coders can be used, so we have
> to hack it to make PubsubMessage appear as a standard coder.
> >
> > More details:
> >
> > There’s a similar magic commit required for Kafka external transforms
> > The Jira issue for this problem is here:
> https://jira.apache.org/jira/browse/BEAM-7870
> > For problem #2 above there seems to be some consensus forming around
> using Avro or schema/row coders to send compound types in a portable way.
> Here’s the PR for making row coders portable
> > https://github.com/apache/beam/pull/9188
>
> +1. Note that this doesn't mean that the IO itself must produce rows;
> part of the Schema work in Java is to make it easy to automatically
> convert from various Java classes to schemas transparently, so this
> same logic that would allow one to apply an SQL filter directly to a
> Kafka/PubSub read would allow cross-language. Even if that doesn't
> work, we need not uglify the Java API; we can have an
> option/alternative transform that appends the convert-to-Row DoFn for
> easier use by external (though the goal of the former work is to make
> this step unnecissary).
>

Updating all IO connectors / transforms to have a version that
produces/consumes a PCollection is infeasible so I agree that we need
an automatic conversion to/from PCollection possibly by injecting
PTransfroms during ExternalTransform expansion.

>
> > I don’t really have any ideas for problem #1
>
> The crux of the issue here is that the jobs API was not designed with
> cross-language in mind, and so the artifact API ties artifacts to jobs
> rather than to environments. To solve this we need to augment the
> notion of environment to allow the specification of additional
> dependencies (e.g. jar files in this specific case, or better as
> maven/pypi/... dependencies (with version ranges) such that
> environment merging and dependency resolution can be sanely done), and
> a way for the expansion service to provide such dependencies.
>
> Max wrote up a summary of the prior discussions at
>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>
> In the short term, one can build a custom docker image that has all
> the requisite dependencies installed.
>
> This touches on a related but separable issue that one may want to run
> some of these transforms "natively" in the same process as the runner
> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
> (Similarly with subprocess.) Exactly how that works with environment
> specifications is also a bit TBD, but my proposal has been that these
> are best viewed as runner-specific substitutions of standard
> environments.
>

We need a permanent solution for this but for now we have a temporary
solution where additional jar files can be specified through an experiment
when running a Python pipeline:
https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55

Thanks,
Cham


>
> > So the portability expansion system works, and now it’s time to sand off
> some of the rough corners. I’d love to hear others’ thoughts on how to
> resolve some of these remaining issues.
>
> +1
>
>
> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> >
> > Hi all,
> > There was some interest in this topic at the Beam Summit this week (btw,
> great job to everyone involved!), so I thought I’d try to summarize the
> current state of things.
> > First, let me explain the idea behind an external transforms for the
> uninitiated.
> >
> > Problem:
> >
> > there’s a transform that you want to use, but it’s not available in your
> desired language. IO connectors are a good example: there are many
> available in the Java SDK, but not so much in Python or Go.
> >
> > Solution:
> >
> > Create a stub transform in your desired language (e.g. Python) whose
> primary role is to serialize the parameters passed to that transform
> > When you run your portable pipeline, just prior to it being sent to the
> Job Service for execution, your stub transform’s payload is first sent to
> the “Expansion Service” that’s running in the native language (Java), where
> the payload is used to construct an instance of the native transform, which
> is then expanded and converted to a protobuf and sent back to the calling
> process (Python).
> > The protobuf 

Re: Running Python Wordcount issues

2019-09-16 Thread Tom Barber
That error went away for me on 2.16 from Git.

Tom


On 17 September 2019 at 00:47:27, Robert Bradshaw (rober...@google.com)
wrote:

What errors did you get with the release-2.16.0 branch?

On Mon, Sep 16, 2019 at 4:43 PM Benjamin Tan
 wrote:
>
> I can confirm. I used the master branch to get this to work. I tried the
"release-2.16.0" branch but that didn't work either.
>
> On 2019/09/16 23:30:54, Tom Barber  wrote:
> > Hello folks,
> >
> > Trying to get started running the python word count example against the
> > portable runner using the beam 2.15 download.
> >
> > In on terminal I have:
> >
> > gradle :runners:spark:job-server:runShadow
> >
> > And another:
> >
> > gradle portableWordCount -PjobEndpoint=localhost:8099
> > -PenvironmentType=LOOPBACK
> >
> >
> > But when it starts it says:
> >
> > Traceback (most recent call last):
> > File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> > "__main__", fname, loader, pkg_name)
> > File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
> > exec code in run_globals
> > File
> >
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",

> > line 135, in 
> > run()
> > File
> >
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",

> > line 115, in run
> > result.wait_until_finish()
> > File
> >
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",

> > line 446, in wait_until_finish
> > self._job_id, self._state, self._last_error_message()))
> > RuntimeError: Pipeline
> > BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
> > failed in state FAILED: java.lang.ClassCastException:
> > org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
> > org.apache.beam.sdk.coders.KvCoder
> >
> > And I have no clue where to start looking to fix that as I’d assume the
> > demos should just run?
> >
> > Tom
> >
> > --
> >
> >
> > Spicule Limited is registered in England & Wales. Company Number:
> > 09954122. Registered office: First Floor, Telecom House, 125-135
Preston
> > Road, Brighton, England, BN1 6AF. VAT No. 251478891.
> >
> >
> >
> >
> > All engagements
> > are subject to Spicule Terms and Conditions of Business. This email and
its
> > contents are intended solely for the individual to whom it is addressed
and
> > may contain information that is confidential, privileged or otherwise
> > protected from disclosure, distributing or copying. Any views or
opinions
> > presented in this email are solely those of the author and do not
> > necessarily represent those of Spicule Limited. The company accepts no
> > liability for any damage caused by any virus transmitted by this email.
If
> > you have received this message in error, please notify us immediately
by
> > reply email before deleting it from your system. Service of legal
notice
> > cannot be effected on Spicule Limited by email.
> >

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Running against Spark 2.2.3

2019-09-16 Thread Tom Barber
Hello folks,

Spark question, both Python and Java gets sad against the portable runner
when trying to run against an existing Spark 2.2.3 server, which the docs
say is supported.

The Spark logs say:

19/09/16 23:59:48 ERROR TransportRequestHandler: Error while invoking
RpcHandler#receive() for one-way message.
java.io.InvalidClassException: org.apache.spark.rpc.RpcEndpointRef; local
class incompatible: stream classdesc serialVersionUID =
-1329125091869941550, local class serialVersionUID = 1835832137613908542

Which from previous experience is generally an library/scala
incompatibility, but I’m not sure what, if anything, I can do to the
portable runner to make it happy.

Thanks

Tom

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Running Python Wordcount issues

2019-09-16 Thread Robert Bradshaw
What errors did you get with the release-2.16.0 branch?

On Mon, Sep 16, 2019 at 4:43 PM Benjamin Tan
 wrote:
>
> I can confirm. I used the master branch to get this to work. I tried the 
> "release-2.16.0" branch but that didn't work either.
>
> On 2019/09/16 23:30:54, Tom Barber  wrote:
> > Hello folks,
> >
> > Trying to get started running the python word count example against the
> > portable runner using the beam 2.15 download.
> >
> > In on terminal I have:
> >
> > gradle :runners:spark:job-server:runShadow
> >
> > And another:
> >
> > gradle portableWordCount -PjobEndpoint=localhost:8099
> > -PenvironmentType=LOOPBACK
> >
> >
> > But when it starts it says:
> >
> > Traceback (most recent call last):
> >   File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> > "__main__", fname, loader, pkg_name)
> >   File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
> > exec code in run_globals
> >   File
> > "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> > line 135, in 
> > run()
> >   File
> > "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> > line 115, in run
> > result.wait_until_finish()
> >   File
> > "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",
> > line 446, in wait_until_finish
> > self._job_id, self._state, self._last_error_message()))
> > RuntimeError: Pipeline
> > BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
> > failed in state FAILED: java.lang.ClassCastException:
> > org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
> > org.apache.beam.sdk.coders.KvCoder
> >
> > And I have no clue where to start looking to fix that as I’d assume the
> > demos should just run?
> >
> > Tom
> >
> > --
> >
> >
> > Spicule Limited is registered in England & Wales. Company Number:
> > 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> > Road, Brighton, England, BN1 6AF. VAT No. 251478891.
> >
> >
> >
> >
> > All engagements
> > are subject to Spicule Terms and Conditions of Business. This email and its
> > contents are intended solely for the individual to whom it is addressed and
> > may contain information that is confidential, privileged or otherwise
> > protected from disclosure, distributing or copying. Any views or opinions
> > presented in this email are solely those of the author and do not
> > necessarily represent those of Spicule Limited. The company accepts no
> > liability for any damage caused by any virus transmitted by this email. If
> > you have received this message in error, please notify us immediately by
> > reply email before deleting it from your system. Service of legal notice
> > cannot be effected on Spicule Limited by email.
> >


Re: Running Python Wordcount issues

2019-09-16 Thread Tom Barber
Alright well I’ve got 2.16 checked out and building so I’ll test it and
feedback.

Thanks!


On 17 September 2019 at 00:43:57, Benjamin Tan (benjamintanwei...@gmail.com)
wrote:

I can confirm. I used the master branch to get this to work. I tried the
"release-2.16.0" branch but that didn't work either.

On 2019/09/16 23:30:54, Tom Barber  wrote:
> Hello folks,
>
> Trying to get started running the python word count example against the
> portable runner using the beam 2.15 download.
>
> In on terminal I have:
>
> gradle :runners:spark:job-server:runShadow
>
> And another:
>
> gradle portableWordCount -PjobEndpoint=localhost:8099
> -PenvironmentType=LOOPBACK
>
>
> But when it starts it says:
>
> Traceback (most recent call last):
> File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> "__main__", fname, loader, pkg_name)
> File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
> exec code in run_globals
> File
>
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",

> line 135, in 
> run()
> File
>
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",

> line 115, in run
> result.wait_until_finish()
> File
>
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",

> line 446, in wait_until_finish
> self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
> failed in state FAILED: java.lang.ClassCastException:
> org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
> org.apache.beam.sdk.coders.KvCoder
>
> And I have no clue where to start looking to fix that as I’d assume the
> demos should just run?
>
> Tom
>
> --
>
>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
>
>
> All engagements
> are subject to Spicule Terms and Conditions of Business. This email and
its
> contents are intended solely for the individual to whom it is addressed
and
> may contain information that is confidential, privileged or otherwise
> protected from disclosure, distributing or copying. Any views or opinions
> presented in this email are solely those of the author and do not
> necessarily represent those of Spicule Limited. The company accepts no
> liability for any damage caused by any virus transmitted by this email.
If
> you have received this message in error, please notify us immediately by
> reply email before deleting it from your system. Service of legal notice
> cannot be effected on Spicule Limited by email.
>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Running Python Wordcount issues

2019-09-16 Thread Benjamin Tan
I can confirm. I used the master branch to get this to work. I tried the 
"release-2.16.0" branch but that didn't work either.

On 2019/09/16 23:30:54, Tom Barber  wrote: 
> Hello folks,
> 
> Trying to get started running the python word count example against the
> portable runner using the beam 2.15 download.
> 
> In on terminal I have:
> 
> gradle :runners:spark:job-server:runShadow
> 
> And another:
> 
> gradle portableWordCount -PjobEndpoint=localhost:8099
> -PenvironmentType=LOOPBACK
> 
> 
> But when it starts it says:
> 
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> "__main__", fname, loader, pkg_name)
>   File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
> exec code in run_globals
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> line 135, in 
> run()
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> line 115, in run
> result.wait_until_finish()
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",
> line 446, in wait_until_finish
> self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
> failed in state FAILED: java.lang.ClassCastException:
> org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
> org.apache.beam.sdk.coders.KvCoder
> 
> And I have no clue where to start looking to fix that as I’d assume the
> demos should just run?
> 
> Tom
> 
> -- 
> 
> 
> Spicule Limited is registered in England & Wales. Company Number: 
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
> 
> 
> 
> 
> All engagements 
> are subject to Spicule Terms and Conditions of Business. This email and its 
> contents are intended solely for the individual to whom it is addressed and 
> may contain information that is confidential, privileged or otherwise 
> protected from disclosure, distributing or copying. Any views or opinions 
> presented in this email are solely those of the author and do not 
> necessarily represent those of Spicule Limited. The company accepts no 
> liability for any damage caused by any virus transmitted by this email. If 
> you have received this message in error, please notify us immediately by 
> reply email before deleting it from your system. Service of legal notice 
> cannot be effected on Spicule Limited by email.
> 


Re: Running Python Wordcount issues

2019-09-16 Thread Tom Barber
Ah I found https://issues.apache.org/jira/browse/BEAM-7864 which I guess is
related. I’ll try a Git based version.


On 17 September 2019 at 00:30:54, Tom Barber (t...@spicule.co.uk) wrote:

Hello folks,

Trying to get started running the python word count example against the
portable runner using the beam 2.15 download.

In on terminal I have:

gradle :runners:spark:job-server:runShadow

And another:

gradle portableWordCount -PjobEndpoint=localhost:8099
-PenvironmentType=LOOPBACK


But when it starts it says:

Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
  File
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
line 135, in 
run()
  File
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
line 115, in run
result.wait_until_finish()
  File
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",
line 446, in wait_until_finish
self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline
BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
failed in state FAILED: java.lang.ClassCastException:
org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
org.apache.beam.sdk.coders.KvCoder

And I have no clue where to start looking to fix that as I’d assume the
demos should just run?

Tom

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Running Python Wordcount issues

2019-09-16 Thread Kyle Weaver
This is a known issue with Beam 2.15. Using master or the 2.16 branch
should fix the problem.

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Mon, Sep 16, 2019 at 4:31 PM Tom Barber  wrote:

> Hello folks,
>
> Trying to get started running the python word count example against the
> portable runner using the beam 2.15 download.
>
> In on terminal I have:
>
> gradle :runners:spark:job-server:runShadow
>
> And another:
>
> gradle portableWordCount -PjobEndpoint=localhost:8099
> -PenvironmentType=LOOPBACK
>
>
> But when it starts it says:
>
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
> "__main__", fname, loader, pkg_name)
>   File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
> exec code in run_globals
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> line 135, in 
> run()
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
> line 115, in run
> result.wait_until_finish()
>   File
> "/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",
> line 446, in wait_until_finish
> self._job_id, self._state, self._last_error_message()))
> RuntimeError: Pipeline
> BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
> failed in state FAILED: java.lang.ClassCastException:
> org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
> org.apache.beam.sdk.coders.KvCoder
>
> And I have no clue where to start looking to fix that as I’d assume the
> demos should just run?
>
> Tom
>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of Business.
> This email and its contents are intended solely for the individual to whom
> it is addressed and may contain information that is confidential,
> privileged or otherwise protected from disclosure, distributing or copying.
> Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of Spicule Limited. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email. If you have received this message in error, please notify us
> immediately by reply email before deleting it from your system. Service of
> legal notice cannot be effected on Spicule Limited by email.
>


Running Python Wordcount issues

2019-09-16 Thread Tom Barber
Hello folks,

Trying to get started running the python word count example against the
portable runner using the beam 2.15 download.

In on terminal I have:

gradle :runners:spark:job-server:runShadow

And another:

gradle portableWordCount -PjobEndpoint=localhost:8099
-PenvironmentType=LOOPBACK


But when it starts it says:

Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
  File
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
line 135, in 
run()
  File
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/examples/wordcount.py",
line 115, in run
result.wait_until_finish()
  File
"/home/ubuntu/beam-release-2.15.0/sdks/python/apache_beam/runners/portability/portable_runner.py",
line 446, in wait_until_finish
self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline
BeamApp-ubuntu-0916232552-dd0af6d7_90675fc0-c39c-4ca7-93fc-c7c7e980f546
failed in state FAILED: java.lang.ClassCastException:
org.apache.beam.sdk.coders.LengthPrefixCoder cannot be cast to
org.apache.beam.sdk.coders.KvCoder

And I have no clue where to start looking to fix that as I’d assume the
demos should just run?

Tom

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Robert Bradshaw
An external scheduler would also create a new job every time. The only
way I see to continuously process results in a single job is to have a
streaming job.

On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi
 wrote:
>
> Hi Juan,
>
> Thanks for the reply ! I want to know if there is any way in dataflow to 
> achieve this before trying external scheduler.
>
> Regards,
> Anjana
> 
> From: Juan Carlos Garcia [jcgarc...@gmail.com]
> Sent: Monday, September 16, 2019 11:23 AM
> To: user@beam.apache.org
> Cc: Richard Amrith Lourdu
> Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop)
>
> Hi Anjana,
>
> You need to separate your line of thoughts between the pipeline definition vs 
> what happens when you call *run* on the pipeline, given that you need 
> externalize the scheduling using something like a crontab, jenkins, or 
> another mechanism.
>
> Best regards,
> JC
>
> On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi  
> wrote:
>>
>> Hi,
>>
>> I am trying to run a task using an infinite while loop with change in input 
>> parameters as below but it creates a new job for every time. (I expect it to 
>> use same pipeline that is created before while loop and should be in a 
>> single job)
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> while True:
>> to_date = time.time()
>>
>> (p
>> | 'create surveys' >> beam.Create(id_list)
>>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, 
>> from_date, to_date))
>>| 'send to output' >> beam.FlatMap(lambda input: 
>> (send_to_api(input)))
>>)
>> from_date = to_date + timedelta(microseconds=1)
>> time.sleep(30)
>>
>> p.run().wait_until_finish()
>>
>> It works properly(in only one job) when there is no while loop as below:
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> (p
>> | 'create surveys' >> beam.Create(id_list)
>>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, 
>> from_date, to_date))
>>| 'send to output' >> beam.FlatMap(lambda input: 
>> (send_to_api(input)))
>>)
>>
>> p.run().wait_until_finish()
>>
>> Could someone please suggest how to make the task run in same job instead of 
>> creating multiple jobs.
>>
>> Please let me know in case if any additional information needed.
>>
>> Thanks,
>> Anjana
>>
>>
>> ---
>>  The information contained in this communication is intended solely for the 
>> use of the individual or entity to whom it is addressed and others 
>> authorized to receive it. It may contain confidential or legally privileged 
>> information. If you are not the intended recipient you are hereby notified 
>> that any disclosure, copying, distribution or taking any action in reliance 
>> on the contents of this information is strictly prohibited and may be 
>> unlawful. If you are not the intended recipient, please notify us 
>> immediately by responding to this email and then delete it from your system. 
>> Bahwan Cybertek is neither liable for the proper and complete transmission 
>> of the information contained in this communication nor for any delay in its 
>> receipt.
>
>
>
> --
>
> JC
>
> ---
>  The information contained in this communication is intended solely for the 
> use of the individual or entity to whom it is addressed and others authorized 
> to receive it. It may contain confidential or legally privileged information. 
> If you are not the intended recipient you are hereby notified that any 
> disclosure, copying, distribution or taking any action in reliance on the 
> contents of this information is strictly prohibited and may be unlawful. If 
> you are not the intended recipient, please notify us immediately by 
> responding to this email and then delete it from your system. Bahwan Cybertek 
> is neither liable for the proper and complete transmission of the information 
> contained in this communication nor for any delay in its receipt.


RE: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Anjana Pydi
Hi Juan,

Thanks for the reply ! I want to know if there is any way in dataflow to 
achieve this before trying external scheduler.

Regards,
Anjana

From: Juan Carlos Garcia [jcgarc...@gmail.com]
Sent: Monday, September 16, 2019 11:23 AM
To: user@beam.apache.org
Cc: Richard Amrith Lourdu
Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop)

Hi Anjana,

You need to separate your line of thoughts between the pipeline definition vs 
what happens when you call *run* on the pipeline, given that you need 
externalize the scheduling using something like a crontab, jenkins, or another 
mechanism.

Best regards,
JC

On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi 
mailto:anjan...@bahwancybertek.com>> wrote:
Hi,

I am trying to run a task using an infinite while loop with change in input 
parameters as below but it creates a new job for every time. (I expect it to 
use same pipeline that is created before while loop and should be in a single 
job)

p = beam.Pipeline(options=PipelineOptions())

while True:
to_date = time.time()

(p
| 'create surveys' >> beam.Create(id_list)
   | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, 
to_date))
   | 'send to output' >> beam.FlatMap(lambda input: 
(send_to_api(input)))
   )
from_date = to_date + timedelta(microseconds=1)
time.sleep(30)

p.run().wait_until_finish()

It works properly(in only one job) when there is no while loop as below:

p = beam.Pipeline(options=PipelineOptions())

(p
| 'create surveys' >> beam.Create(id_list)
   | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, 
to_date))
   | 'send to output' >> beam.FlatMap(lambda input: 
(send_to_api(input)))
   )

p.run().wait_until_finish()

Could someone please suggest how to make the task run in same job instead of 
creating multiple jobs.

Please let me know in case if any additional information needed.

Thanks,
Anjana


---
 The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you are not the 
intended recipient, please notify us immediately by responding to this email 
and then delete it from your system. Bahwan Cybertek is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


--

JC

---
 The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you are not the 
intended recipient, please notify us immediately by responding to this email 
and then delete it from your system. Bahwan Cybertek is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


RE: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Anjana Pydi
Hi Robert,

Thanks for reply !

Yes, I want a single pipeline instead of new job for every 30 seconds. In the 
approach you mentioned, it creates new job every time.

Regards,
Anjana

From: Robert Bradshaw [rober...@google.com]
Sent: Monday, September 16, 2019 11:51 AM
To: user
Cc: Richard Amrith Lourdu
Subject: Re: Looping in Dataflow(Creating multiple jobs for a while loop)

You can put your pipeline definition inside your while loop, i.e.

while True:
   to_date = time.time()

   p = beam.Pipeline(options=PipelineOptions())
   (p
| 'create surveys' >> beam.Create(id_list)
   | 'get data' >> beam.FlatMap(lambda id: get_api_data(id,
from_date, to_date))
   | 'send to output' >> beam.FlatMap(lambda input:
(send_to_api(input)))
   )
from_date = to_date + timedelta(microseconds=1)
time.sleep(30)
p.run().wait_until_finish()

which will run pipelines until your script terminates, but this will
fire off a new batch job every 30+ seconds. It sounds like what you'd
really want to do here is have a single streaming pipeline.

On Mon, Sep 16, 2019 at 11:24 AM Juan Carlos Garcia  wrote:
>
> Hi Anjana,
>
> You need to separate your line of thoughts between the pipeline definition vs 
> what happens when you call *run* on the pipeline, given that you need 
> externalize the scheduling using something like a crontab, jenkins, or 
> another mechanism.
>
> Best regards,
> JC
>
> On Mon, Sep 16, 2019 at 7:57 PM Anjana Pydi  
> wrote:
>>
>> Hi,
>>
>> I am trying to run a task using an infinite while loop with change in input 
>> parameters as below but it creates a new job for every time. (I expect it to 
>> use same pipeline that is created before while loop and should be in a 
>> single job)
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> while True:
>> to_date = time.time()
>>
>> (p
>> | 'create surveys' >> beam.Create(id_list)
>>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, 
>> from_date, to_date))
>>| 'send to output' >> beam.FlatMap(lambda input: 
>> (send_to_api(input)))
>>)
>> from_date = to_date + timedelta(microseconds=1)
>> time.sleep(30)
>>
>> p.run().wait_until_finish()
>>
>> It works properly(in only one job) when there is no while loop as below:
>>
>> p = beam.Pipeline(options=PipelineOptions())
>>
>> (p
>> | 'create surveys' >> beam.Create(id_list)
>>| 'get data' >> beam.FlatMap(lambda id: get_api_data(id, 
>> from_date, to_date))
>>| 'send to output' >> beam.FlatMap(lambda input: 
>> (send_to_api(input)))
>>)
>>
>> p.run().wait_until_finish()
>>
>> Could someone please suggest how to make the task run in same job instead of 
>> creating multiple jobs.
>>
>> Please let me know in case if any additional information needed.
>>
>> Thanks,
>> Anjana
>>
>>
>> ---
>>  The information contained in this communication is intended solely for the 
>> use of the individual or entity to whom it is addressed and others 
>> authorized to receive it. It may contain confidential or legally privileged 
>> information. If you are not the intended recipient you are hereby notified 
>> that any disclosure, copying, distribution or taking any action in reliance 
>> on the contents of this information is strictly prohibited and may be 
>> unlawful. If you are not the intended recipient, please notify us 
>> immediately by responding to this email and then delete it from your system. 
>> Bahwan Cybertek is neither liable for the proper and complete transmission 
>> of the information contained in this communication nor for any delay in its 
>> receipt.
>
>
>
> --
>
> JC
>
---
 The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you are not the 
intended recipient, please notify us immediately by responding to this email 
and then delete it from your system. Bahwan Cybertek is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


Re: The state of external transforms in Beam

2019-09-16 Thread Robert Bradshaw
Thanks for bringing this up again. My thoughts on the open questions below.

On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
> That commit solves 2 problems:
>
> Adds the pubsub Java deps so that they’re available in our portable pipeline
> Makes the coder for the PubsubIO message-holder type, PubsubMessage, 
> available as a standard coder. This is required because both PubsubIO.Read 
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage 
> objects, but only “standard” (i.e. portable) coders can be used, so we have 
> to hack it to make PubsubMessage appear as a standard coder.
>
> More details:
>
> There’s a similar magic commit required for Kafka external transforms
> The Jira issue for this problem is here: 
> https://jira.apache.org/jira/browse/BEAM-7870
> For problem #2 above there seems to be some consensus forming around using 
> Avro or schema/row coders to send compound types in a portable way. Here’s 
> the PR for making row coders portable
> https://github.com/apache/beam/pull/9188

+1. Note that this doesn't mean that the IO itself must produce rows;
part of the Schema work in Java is to make it easy to automatically
convert from various Java classes to schemas transparently, so this
same logic that would allow one to apply an SQL filter directly to a
Kafka/PubSub read would allow cross-language. Even if that doesn't
work, we need not uglify the Java API; we can have an
option/alternative transform that appends the convert-to-Row DoFn for
easier use by external (though the goal of the former work is to make
this step unnecissary).

> I don’t really have any ideas for problem #1

The crux of the issue here is that the jobs API was not designed with
cross-language in mind, and so the artifact API ties artifacts to jobs
rather than to environments. To solve this we need to augment the
notion of environment to allow the specification of additional
dependencies (e.g. jar files in this specific case, or better as
maven/pypi/... dependencies (with version ranges) such that
environment merging and dependency resolution can be sanely done), and
a way for the expansion service to provide such dependencies.

Max wrote up a summary of the prior discussions at
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

In the short term, one can build a custom docker image that has all
the requisite dependencies installed.

This touches on a related but separable issue that one may want to run
some of these transforms "natively" in the same process as the runner
(e.g. a Java IO in the Flink Java Runner) rather than via docker.
(Similarly with subprocess.) Exactly how that works with environment
specifications is also a bit TBD, but my proposal has been that these
are best viewed as runner-specific substitutions of standard
environments.

> So the portability expansion system works, and now it’s time to sand off some 
> of the rough corners. I’d love to hear others’ thoughts on how to resolve 
> some of these remaining issues.

+1


On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova  wrote:
>
> Hi all,
> There was some interest in this topic at the Beam Summit this week (btw, 
> great job to everyone involved!), so I thought I’d try to summarize the 
> current state of things.
> First, let me explain the idea behind an external transforms for the 
> uninitiated.
>
> Problem:
>
> there’s a transform that you want to use, but it’s not available in your 
> desired language. IO connectors are a good example: there are many available 
> in the Java SDK, but not so much in Python or Go.
>
> Solution:
>
> Create a stub transform in your desired language (e.g. Python) whose primary 
> role is to serialize the parameters passed to that transform
> When you run your portable pipeline, just prior to it being sent to the Job 
> Service for execution, your stub transform’s payload is first sent to the 
> “Expansion Service” that’s running in the native language (Java), where the 
> payload is used to construct an instance of the native transform, which is 
> then expanded and converted to a protobuf and sent back to the calling 
> process (Python).
> The protobuf representation of the expanded transform gets integrated back 
> into the pipeline that you’re submitting
> Steps 2-3 are repeated for each external transform in your pipeline
> Then the whole pipeline gets sent to the Job Service to be invoked on 
> Flink/Spark/etc
>
> 
>
> Now on to my journey to get PubsubIO working in python on Flink.
>
> The first issue I encountered was that there was a lot of boilerplate 
> involved in serializing the stub python transform’s parameters so they can be 
> sent to the expansion service.
>
> I created a PR to make this simpler, which has just been merged to master: 
> https://github.com/apache/beam/pull/9098
>
> With this feature in place, if you’re using python 3.7 you can use a 
> dataclass and the typing module to 

The state of external transforms in Beam

2019-09-16 Thread Chad Dombrova
Hi all,
There was some interest in this topic at the Beam Summit this week (btw,
great job to everyone involved!), so I thought I’d try to summarize the
current state of things.
First, let me explain the idea behind an external transforms for the
uninitiated.

Problem:

   - there’s a transform that you want to use, but it’s not available in
   your desired language. IO connectors are a good example: there are many
   available in the Java SDK, but not so much in Python or Go.

Solution:

   1. Create a stub transform in your desired language (e.g. Python) whose
   primary role is to serialize the parameters passed to that transform
   2. When you run your portable pipeline, just prior to it being sent to
   the Job Service for execution, your stub transform’s payload is first sent
   to the “Expansion Service” that’s running in the native language (Java),
   where the payload is used to construct an instance of the native transform,
   which is then expanded and converted to a protobuf and sent back to the
   calling process (Python).
   3. The protobuf representation of the expanded transform gets integrated
   back into the pipeline that you’re submitting
   4. Steps 2-3 are repeated for each external transform in your pipeline
   5. Then the whole pipeline gets sent to the Job Service to be invoked on
   Flink/Spark/etc

--

Now on to my journey to get PubsubIO working in python on Flink.

The first issue I encountered was that there was a lot of boilerplate
involved in serializing the stub python transform’s parameters so they can
be sent to the expansion service.

I created a PR to make this simpler, which has just been merged to master:
https://github.com/apache/beam/pull/9098

With this feature in place, if you’re using python 3.7 you can use a
dataclass and the typing module to create your transform and describe your
schema in one go. For example:

@dataclasses.dataclass
class MyAwesomeTransform(beam.ExternalTransform):
  URN = 'beam:external:fakeurn:v1'

  integer_example: int
  string_example: str
  list_of_strings: List[str]
  optional_kv: Optional[Tuple[str, float]] = None
  optional_integer: Optional[int] = None
  expansion_service: dataclasses.InitVar[Optional[str]] = None

For earlier versions of python, you can use typing.NamedTuple to declare
your schema.

MyAwesomeSchema = typing.NamedTuple(
'MyAwesomeSchema',
[
('integer_example', int),
('string_example', unicode),
('list_of_strings', List[unicode]),
('optional_kv', Optional[Tuple[unicode, float]]),
('optional_integer', Optional[int]),
]
)

There’s also an option to generate the schema implicitly based on the
value(s) you wish to serialize.

There was a slight tangent in implementing this feature in that requesting
a coder for typing.List resulted in pickle coder instead of IterableCoder.
That’s bad because only standard/portable coders can be used for expansion
in Java (for obvious reasons), so as a convenience that was solved here:
https://github.com/apache/beam/pull/9344

The next issue that I encountered was that python did not track the
boundedness of PCollections, which made it impossible to use the expansion
service to create unbounded writes. That’s been solved and merged here:
https://github.com/apache/beam/pull/9426

So that brings us to the actual PR for adding external transform support
for PubsubIO: https://github.com/apache/beam/pull/9268

The PR works, but with one big caveat: in order to use it you must build
your Java containers with this special commit:
https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870

That commit solves 2 problems:

   1. Adds the pubsub Java deps so that they’re available in our portable
   pipeline
   2. Makes the coder for the PubsubIO message-holder type, PubsubMessage,
   available as a standard coder. This is required because both PubsubIO.Read
   and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
   objects, but only “standard” (i.e. portable) coders can be used, so we have
   to hack it to make PubsubMessage appear as a standard coder.

More details:

   - There’s a similar magic commit required for Kafka external transforms
   - The Jira issue for this problem is here:
   https://jira.apache.org/jira/browse/BEAM-7870
   - For problem #2 above there seems to be some consensus forming around
   using Avro or schema/row coders to send compound types in a portable way.
   Here’s the PR for making row coders portable
   https://github.com/apache/beam/pull/9188
   - I don’t really have any ideas for problem #1

So the portability expansion system works, and now it’s time to sand off
some of the rough corners. I’d love to hear others’ thoughts on how to
resolve some of these remaining issues.

-chad


Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Anjana Pydi
Hi,

I am trying to run a task using an infinite while loop with change in input 
parameters as below but it creates a new job for every time. (I expect it to 
use same pipeline that is created before while loop and should be in a single 
job)

p = beam.Pipeline(options=PipelineOptions())

while True:
to_date = time.time()

(p
| 'create surveys' >> beam.Create(id_list)
   | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, 
to_date))
   | 'send to output' >> beam.FlatMap(lambda input: 
(send_to_api(input)))
   )
from_date = to_date + timedelta(microseconds=1)
time.sleep(30)

p.run().wait_until_finish()

It works properly(in only one job) when there is no while loop as below:

p = beam.Pipeline(options=PipelineOptions())

(p
| 'create surveys' >> beam.Create(id_list)
   | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, 
to_date))
   | 'send to output' >> beam.FlatMap(lambda input: 
(send_to_api(input)))
   )

p.run().wait_until_finish()

Could someone please suggest how to make the task run in same job instead of 
creating multiple jobs.

Please let me know in case if any additional information needed.

Thanks,
Anjana


---
 The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you are not the 
intended recipient, please notify us immediately by responding to this email 
and then delete it from your system. Bahwan Cybertek is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


Re: How can I work with multiple pcollections?

2019-09-16 Thread Steve973
Lukasz,

It has been a few days since your reply, but I wanted to thank you for
pointing me toward the "additional outputs" portion of the documentation.
I had already read through that (if not completely thoroughly) although, at
the time, I did not quite know the requirements of what I would be doing,
so I did not really remember that part.  I have some more work to do on my
code before I can begin to use Beam (to make it much better!) but I think
this should help quite a bit.

Thanks again!
Steve

On Thu, Sep 12, 2019 at 5:31 PM Lukasz Cwik  wrote:

> Yes you can create multiple output PCollections using a ParDo with
> multiple outputs instead of inserting them into Mongo.
>
> It could be useful to read through the programming guide related to
> PCollections[1] and PTransforms with multiple outputs[2] and feel free to
> return with more questions.
>
> 1: https://beam.apache.org/documentation/programming-guide/#pcollections
> 2:
> https://beam.apache.org/documentation/programming-guide/#additional-outputs
>
> On Thu, Sep 12, 2019 at 2:24 PM Steve973  wrote:
>
>> I am new to Beam, and I am pretty excited to get started.  I have been
>> doing quite a bit of research and playing around with the API.  But for my
>> use case, unless I am not approaching it correctly, suggests that I will
>> need to process multiple PCollections in some parts of my pipeline.
>>
>> I am working out some of my business logic without a parallelization
>> framework to get the solution working.  Then I will convert the workflow to
>> Beam.  What I am doing is reading millions of files from the file system,
>> and I am processing parts of the file into three different output types,
>> and storing them in MongoDB in three collections.  After this initial
>> extraction (mapping), I modify some of the data which will result in
>> duplicates.  So the next step is a reduction step to eliminate the
>> duplicates (based on a number of fields) and aggregate the references to
>> the other 2 data types, so the reduced object contains the dedupe fields,
>> and a list of references to documents in the other 2 collections.  I'm not
>> touching either of these two collections at this time, but this is where my
>> question comes in.  If I map this data, can I create three separate
>> PCollections instead of inserting them into Mongo?  After the
>> deduplication, I will need to combine data in two of the streams, and I
>> need to store the results of that combination into mongo.  Then I need to
>> process the third collection, which will go into its own mongo collection.
>>
>> I hope my description was at least enough to get the conversation
>> started.  Is my approach reasonable, and can I create multiple PCollections
>> and use them at different phases of my pipeline?  Or is there another way
>> that I should be looking at this?
>>
>> Thanks in advance!
>> Steve
>>
>


Re: Python errors when using batch+windows+textio

2019-09-16 Thread Paweł Kordek
Hi Kyle

I'm on 2.15. Thanks for pointing me to the JIRA, I'll watch it and also try
to see what's causing the problem.

Best regards
Pawel

On Fri, 13 Sep 2019 at 01:43, Kyle Weaver  wrote:

> Hi Pawel, could you tell us which version of the Beam Python SDK you are
> using?
>
> For the record, this looks like a known issue:
> https://issues.apache.org/jira/browse/BEAM-6860
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek 
> wrote:
>
>> Hi
>>
>> I was developing a simple pipeline where I aggregate records by key and
>> sum values for a predefined window. I was getting some errors, and after
>> checking, I am getting exactly the same issues when running Wikipedia
>> example from the Beam repo. The output is as follows:
>> ---
>> INFO:root:Missing pipeline option (runner). Executing pipeline using the
>> default runner: DirectRunner.
>> INFO:root: > at 0x7f333fc1fe60> 
>> INFO:root: > 0x7f333fc1ff80> 
>> INFO:root: > 0x7f333fc1d050> 
>> INFO:root: 
>> 
>> INFO:root: 
>> 
>> INFO:root: 
>> 
>> INFO:root: 
>> 
>> INFO:root: > 0x7f333fc1d3b0> 
>> INFO:root: > 0x7f333fc1d440> 
>> INFO:root: > 0x7f333fc1d5f0> 
>> INFO:root: 
>> 
>> INFO:root: > 0x7f333fc1d710> 
>> INFO:root:Running
>> ((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(> at
>> top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
>> INFO:root:Running
>> (((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
>> INFO:root:Running
>> (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
>> INFO:root:Running
>> ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
>> Traceback (most recent call last):
>>   File "apache_beam/runners/common.py", line 829, in
>> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>>   File "apache_beam/runners/common.py", line 403, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 406, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 982, in
>> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>>   File "apache_beam/runners/worker/operations.py", line 142, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>   File "apache_beam/runners/worker/operations.py", line 122, in
>> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>>   File "apache_beam/runners/worker/opcounters.py", line 196, in
>> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>>   File "apache_beam/runners/worker/opcounters.py", line 214, in
>> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>>   File "apache_beam/coders/coder_impl.py", line 1014, in
>>