Re: Aggregate-and-join in Beam

2020-02-14 Thread Paweł Kordek
I've just read this message again and realized that the requirement "in an hour 
preceding the timestamp" is not feasible nor necessary, for this part I will 
use sliding windows, let's say by 5 minutes. The rest of the question still 
stands, I still haven't wrapped my head around it. The important part is that 
the emitted value's aggregate takes into account current element's value itself.

____
From: Paweł Kordek
Sent: Friday, February 14, 2020 10:53
To: user@beam.apache.org 
Subject: Aggregate-and-join in Beam

Hello

I am working on a relatively simple streaming data pipeline. Imagine the 
elements of PCollection represent something like sensor readings (with event 
timestamps):

Reading(id: String, value: Int)

For each incoming element I want to have at the end of the pipeline:

Reading(id: String, value: Int, meanLastHr: Double)

where meanLastHr is the mean of all the values for a given id in an hour 
preceding the timestamp of this particular reading.

If I am to implement this in Beam, to get the means, I have to transform the 
input into PCollection of KVs (id, value), apply 1hr window, CombinePerKey and 
trigger on every element. My question is what is the simplest/most idiomatic 
way to join these values back with the original reading? This is one-to-one 
join on (event timestamp, id), I can assume that there are no readings where 
this pair of values would be the same. One way I can think of is to use the 
PCollection with aggregated values as a side input to a ParDo, but I'm not sure 
how the windows would be mapped to a main global-windowed collection (what if I 
applied a global windowing to the side input?). If, for example, I used Flink 
directly I could go with KeyedCoProcessFunction, but I don't see any concept in 
Beam that would map to it directly. Any help and suggestions would be 
appreciated.

Best regards
Pawel


Fwd: Running a Beam Pipeline on GCP Dataproc Flink Cluster

2020-02-08 Thread Paweł Kordek


From: Paweł Kordek 
Sent: Saturday, February 8, 2020, 08:48
To: Xander Song
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

It's because 1.9.1 is no longer hosted under this particular link you can 
change to 1.9.2. BTW this link is just an example (sorry I wasn't clear 
enough), in fact you should go to the Flink releases page and get a link from 
there, not necessarily for the same mirror.

Cheers
Paweł

Get Outlook for Android<https://aka.ms/ghei36>

From: Xander Song 
Sent: Saturday, February 8, 2020 6:40:26 AM
To: Paweł Kordek 
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

Thanks for your suggestion. I tried to add the suggested flag, but now cluster 
creation fails. I executed


REGION=us-west1

CLUSTER_NAME=test-cluster

gcloud dataproc clusters create ${CLUSTER_NAME} \

--region ${REGION} \

--initialization-actions 
gs://goog-dataproc-initialization-actions-${REGION}/flink/flink.sh \

--metadata 
flink-snapshot-url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435%7C1%7C0%7C637167408413530608=CLPaDBIEah9ex7D1E1eCCWN0O6zkSVfAT59cZxCth5Q%3D=0>




at the command line. I received the following terminal output.



Waiting on operation 
[projects/innate-life-265704/regions/us-west1/operations/b44911f9-3bca-3d8b-9cb7-897a24e1f3f6].

Waiting for cluster creation operation...⠶

WARNING: For PD-Standard without local SSDs, we strongly recommend provisioning 
1TB or larger to ensure consistently high I/O performance. See 
https://cloud.google.com/compute/docs/disks/performance<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcloud.google.com%2Fcompute%2Fdocs%2Fdisks%2Fperformance=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435%7C1%7C0%7C637167408413540617=%2FOWVmk4ouDjhPgbdHgKXo0N1S08FYPt6zBNdrzVURCc%3D=0>
 for information on disk I/O performance.

Waiting for cluster creation operation...⠶

WARNING: Cluster test-cluster failed to create. Beginning automated resource 
cleanup process.

Waiting for cluster creation operation...done.

ERROR: (gcloud.dataproc.clusters.create) Operation 
[projects/innate-life-265704/regions/us-west1/operations/b44911f9-3bca-3d8b-9cb7-897a24e1f3f6]
 failed: Initialization action failed. Failed action 
'gs://goog-dataproc-initialization-actions-us-west1/flink/flink.sh', see output 
in: 
gs://dataproc-bb4bc21b-9947-4fd7-bb15-f3e1a696483c-us-west1/google-cloud-dataproc-metainfo/696b98b6-afcd-4f7c-b566-4fdab6fe9374/test-cluster-m/dataproc-initialization-script-0_output.




The contents of the output file were:




-b566-4fdab6fe9374/test-cluster-m/dataproc-initialization-script-0_output

+ export 
PATH=/usr/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

+ PATH=/usr/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin

+ readonly FLINK_INSTALL_DIR=/usr/lib/flink

+ FLINK_INSTALL_DIR=/usr/lib/flink

+ readonly FLINK_WORKING_DIR=/var/lib/flink

+ FLINK_WORKING_DIR=/var/lib/flink

+ readonly FLINK_YARN_SCRIPT=/usr/bin/flink-yarn-daemon

+ FLINK_YARN_SCRIPT=/usr/bin/flink-yarn-daemon

+ readonly FLINK_WORKING_USER=yarn

+ FLINK_WORKING_USER=yarn

+ readonly HADOOP_CONF_DIR=/etc/hadoop/conf

+ HADOOP_CONF_DIR=/etc/hadoop/conf

+ readonly FLINK_NETWORK_NUM_BUFFERS=2048

+ FLINK_NETWORK_NUM_BUFFERS=2048

+ readonly FLINK_JOBMANAGER_MEMORY_FRACTION=1.0

+ FLINK_JOBMANAGER_MEMORY_FRACTION=1.0

+ readonly FLINK_TASKMANAGER_MEMORY_FRACTION=1.0

+ FLINK_TASKMANAGER_MEMORY_FRACTION=1.0

+ readonly START_FLINK_YARN_SESSION_METADATA_KEY=flink-start-yarn-session

+ START_FLINK_YARN_SESSION_METADATA_KEY=flink-start-yarn-session

+ readonly START_FLINK_YARN_SESSION_DEFAULT=true

+ START_FLINK_YARN_SESSION_DEFAULT=true

+ readonly FLINK_SNAPSHOT_URL_METADATA_KEY=flink-snapshot-url

+ FLINK_SNAPSHOT_URL_METADATA_KEY=flink-snapshot-url

+ main

+ local role

++ /usr/share/google/get_metadata_value attributes/dataproc-role

+ role=Master

+ /usr/share/google/get_metadata_value attributes/flink-snapshot-url

http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz+<https://nam10.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmirrors.up.pt%2Fpub%2Fapache%2Fflink%2Fflink-1.9.1%2Fflink-1.9.1-bin-scala_2.11.tgz%2B=02%7C01%7C%7C9ac0128f5a7f41ae5a9b08d7ac61cffc%7C84df9e7fe9f640afb435%7C1%7C0%7C637167408413540617=O3HQSbUeTu%2BJjgzndIXsyad3LOJS3jdV438hwRcwnJw%3D=0>
 install_flink_snapshot

+ local work_dir

++ mktemp -d

+ work_dir=/tmp/tmp.6vPgP5mYq4

+ local flink_url

++ /usr/share/google/get_metadata_value attributes/flink-snapshot-url

+ 
flink_url=http://mirrors.up.pt/pub/apache/flink/fli

Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

2020-02-07 Thread Paweł Kordek
Hi

I had similar use-case recently, and adding a metadata key solved the issue 
https://github.com/GoogleCloudDataproc/initialization-actions/pull/334. You 
keep the original initialization action and add for example (using gcloud) 
'--metadata 
flink-snapshot-url=http://mirrors.up.pt/pub/apache/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz'

Cheers
Pawel

From: Ismaël Mejía 
Sent: Friday, February 7, 2020 2:24 PM
To: Xander Song ; user@beam.apache.org 

Cc: u...@flink.apache.org 
Subject: Re: Running a Beam Pipeline on GCP Dataproc Flink Cluster

+user@beam.apache.org


On Fri, Feb 7, 2020 at 12:54 AM Xander Song 
mailto:iamuuriw...@gmail.com>> wrote:
I am attempting to run a Beam pipeline on a GCP Dataproc Flink cluster. I have 
followed the instructions at this 
repo
 to create a Flink cluster on Dataproc using an initialization action. However, 
the resulting cluster uses version 1.5.6 of Flink, and my project requires a 
more recent version (version 1.7, 1.8, or 1.9) for compatibility with 
Beam.

Inside of the flink.sh script in the linked repo, there is a line for 
installing Flink from a snapshot URL instead of 
apt.
 Is this the correct mechanism for installing a different version of Flink 
using the initialization script? If so, how is it meant to be used?

Thank you in advance.


Re: Python errors when using batch+windows+textio

2019-09-16 Thread Paweł Kordek
Hi Kyle

I'm on 2.15. Thanks for pointing me to the JIRA, I'll watch it and also try
to see what's causing the problem.

Best regards
Pawel

On Fri, 13 Sep 2019 at 01:43, Kyle Weaver  wrote:

> Hi Pawel, could you tell us which version of the Beam Python SDK you are
> using?
>
> For the record, this looks like a known issue:
> https://issues.apache.org/jira/browse/BEAM-6860
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek 
> wrote:
>
>> Hi
>>
>> I was developing a simple pipeline where I aggregate records by key and
>> sum values for a predefined window. I was getting some errors, and after
>> checking, I am getting exactly the same issues when running Wikipedia
>> example from the Beam repo. The output is as follows:
>> ---
>> INFO:root:Missing pipeline option (runner). Executing pipeline using the
>> default runner: DirectRunner.
>> INFO:root: > at 0x7f333fc1fe60> 
>> INFO:root: > 0x7f333fc1ff80> 
>> INFO:root: > 0x7f333fc1d050> 
>> INFO:root: 
>> 
>> INFO:root: 
>> 
>> INFO:root: 
>> 
>> INFO:root: 
>> 
>> INFO:root: > 0x7f333fc1d3b0> 
>> INFO:root: > 0x7f333fc1d440> 
>> INFO:root: > 0x7f333fc1d5f0> 
>> INFO:root: 
>> 
>> INFO:root: > 0x7f333fc1d710> 
>> INFO:root:Running
>> ((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(> at
>> top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
>> INFO:root:Running
>> (((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
>> INFO:root:Running
>> (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
>> INFO:root:Running
>> ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
>> Traceback (most recent call last):
>>   File "apache_beam/runners/common.py", line 829, in
>> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>>   File "apache_beam/runners/common.py", line 403, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 406, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 982, in
>> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>>   File "apache_beam/runners/worker/operations.py", line 142, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>   File "apache_beam/runners/worker/operations.p

Python errors when using batch+windows+textio

2019-09-11 Thread Paweł Kordek
Hi

I was developing a simple pipeline where I aggregate records by key and sum
values for a predefined window. I was getting some errors, and after
checking, I am getting exactly the same issues when running Wikipedia
example from the Beam repo. The output is as follows:
---
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:root:  
INFO:root:  
INFO:root: 

INFO:root: 

INFO:root: 

INFO:root: 

INFO:root: 

INFO:root: 

INFO:root:  
INFO:root:  
INFO:root: 

INFO:root:  
INFO:root:Running
((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter()_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
INFO:root:Running
(((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
INFO:root:Running
(((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
INFO:root:Running
ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 829, in
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 403, in
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 406, in
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 982, in
apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
  File "apache_beam/runners/worker/operations.py", line 142, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 122, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 196, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 214, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1014, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1030, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 814, in
apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 828, in
apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 145, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 494, in
apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
TypeError: Cannot convert GlobalWindow to