Re: [DISCUSS] Drop Euphoria extension

2023-10-19 Thread Kenneth Knowles
Makes sense to me. Let's deprecate for the 2.52.0 release unless there is
some objection. You can also look at the maven central downloads (I believe
all PMC and maybe all committers can view this) compared to other Beam jars.

Kenn

On Mon, Oct 16, 2023 at 9:28 AM Jan Lukavský  wrote:

> Sure, that would be probably the preferred way to go. For now, I'm
> trying to get some feedback, if there are some real-world users who
> might miss the API. Currently, the only value I see is that Euphoria
> adds an additional level of indirection for user code. The expansion
> goes like this:
>
>   Euphoria Pipeline -> runtime provided translators -> vanilla Beam
> Pipeline -> runner
>
> Hence code written using Euphoria extension can be modified at runtime
> (Pipeline construction time) using dependency injection, which brings
> the value that users can modify (typically optimize) Pipelines without
> actually modifying the business logic. On the other hand I'm not sure if
> this justifies the complexity of the extension. Were this the only
> value, it should be possible to implement such dynamic expansion either
> into Java SDK core or as a different light-weight extension.
>
>   Jan
>
> On 10/16/23 15:10, Alexey Romanenko wrote:
> > Can we just deprecate it for a while and then remove completely?
> >
> > —
> > Alexey
> >
> >> On 13 Oct 2023, at 18:59, Jan Lukavský  wrote:
> >>
> >> Hi,
> >>
> >> it has been some time since Euphoria extension [1] has been adopted by
> Beam as a possible "Java 8 API". Beam has evolved from that time a lot, the
> current API seems actually more elegant than the original Euphoria's and
> last but not least, it has no maintainers and no known users. If there are
> any users, please speak up!
> >>
> >> Otherwise I'd like to propose to drop it from codebase, I'll start a
> vote thread during next week, if there are no objections.
> >>
> >> Best,
> >>
> >>   Jan
> >>
> >> [1] https://beam.apache.org/documentation/sdks/java/euphoria/
> >>
>


[ANNOUNCE] Apache Beam 2.51.0 Released

2023-10-18 Thread Kenneth Knowles
The Apache Beam Team is pleased to announce the release of version 2.51.0.

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed
on the Beam Blog: https://beam.apache.org/blog/beam-2.51.0/ and the
Github release page
https://github.com/apache/beam/releases/tag/v2.51.0

Thanks to everyone who contributed to this release, and we hope you
enjoy using Beam 2.51.0.

Kenn, on behalf of the Apache Beam Team.


Re: simplest way to do exponential moving average?

2023-10-02 Thread Kenneth Knowles
Just to be pedantic about it: Jan's approach is preferred because it would
be much more _parallel_. Any actual computation that depends on everything
being in order is by definition not parallel (nothing to do with Beam).

Kenn

On Mon, Oct 2, 2023 at 5:00 AM Jan Lukavský  wrote:

> Hi,
>
> this depends on how exactly you plan to calculate the average. The
> original definition is based on exponentially decreasing weight of more
> distant (older if time is on the x-axis) data points. This (technically)
> means that this average at any point X1 depends on all values X0 <= X1.
> This would therefore require buffering (using GroupByKey) all elements in
> global window, doing the sorting manually and then computing the new value
> of the average triggering after each element. This is probably the
> technically correct, but most computationally intensive variant.
>
> If the average is done over time intervals, then an other option could be
> to define a cut-off interval T, i.e. set the exponentially vanishing weight
> of value of data points to be zero at some T0 < T1 - T. If the data points
> come at some discrete time-intervals (minutes, hours, days), then this
> could mean you can split the data into time sliding windows (window
> interval being the cut-off interval, and slide the update interval) and
> assign weight for each data point in the particular time interval - i.e.
> how much weight does the data point have at the time of end of the sliding
> window. With this you could then using CombineFn to count and sum the
> weighted averages, which would be much more efficient.
>
> Best,
>
>  Jan
> On 9/30/23 17:08, Balogh, György wrote:
>
> Hi,
> I want to calculate the exponential moving average of a signal using beam
> in java.
> I understand there is no time order guarantee on incoming data. What would
> be the simplest solution for this?
> Thank you,
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>


Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

2023-08-14 Thread Kenneth Knowles
There is a slack channel linked from
https://beam.apache.org/community/contact-us/ it is #beam on
the-asf.slack.com

(you find this via beam.apache.org -> Community -> Contact Us)

It sounds like an issue with running a multi-language pipeline on the
portable flink runner. (something which I am not equipped to help with in
detail)

Kenn

On Wed, Aug 9, 2023 at 2:51 PM kapil singh  wrote:

> Hey,
>
> I've been grappling with this issue for the past five days and, despite my
> continuous efforts, I haven't found a resolution. Additionally, I've been
> unable to locate a Slack channel for Beam where I might seek assistance.
>
> issue
>
> *RuntimeError: Pipeline construction environment and pipeline runtime
> environment are not compatible. If you use a custom container image, check
> that the Python interpreter minor version and the Apache Beam version in
> your image match the versions used at pipeline construction time.
> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
> Runtime environment:
> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>
>
> Here what i am trying to do
>
>  i am running job from kubernetes container  that hits on job server and
> then job manager and task manager
> task manager and job manager is one Container
>
> Here is  My custom Dockerfile. name:custom-flink
>
> # Starting with the base Flink image
> FROM apache/flink:1.16-java11
> ARG FLINK_VERSION=1.16
> ARG KAFKA_VERSION=2.8.0
>
> # Install python3.8 and its associated dependencies, followed by pyflink
> RUN set -ex; \
> apt-get update && \
> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev lzma liblzma-dev && \
> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
> tar -xvf Python-3.8.0.tgz && \
> cd Python-3.8.0 && \
> ./configure --without-tests --enable-shared && \
> make -j4 && \
> make install && \
> ldconfig /usr/local/lib && \
> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
> apt-get clean && \
> rm -rf /var/lib/apt/lists/* && \
> python -m pip install --upgrade pip; \
> pip install apache-flink==${FLINK_VERSION}; \
> pip install kafka-python
>
> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>
> # Copy files from official SDK image, including script/dependencies.
> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
> /opt/apache/beam/
>
> # java SDK
> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
> /opt/apache/beam_java/
>
> RUN apt-get update && apt-get install -y python3-venv && rm -rf
> /var/lib/apt/lists/*
>
> # Give permissions to the /opt/apache/beam-venv directory
> RUN mkdir -p /opt/apache/beam-venv && chown -R :
> /opt/apache/beam-venv
>
> Here is my Deployment file for Job manager,Task manager plus worker-pool
> and job server
>
>
> apiVersion: v1
> kind: Service
> metadata:
> name: flink-jobmanager
> namespace: flink
> spec:
> type: ClusterIP
> ports:
> - name: rpc
> port: 6123
> - name: blob-server
> port: 6124
> - name: webui
> port: 8081
> selector:
> app: flink
> component: jobmanager
> ---
> apiVersion: v1
> kind: Service
> metadata:
> name: beam-worker-pool
> namespace: flink
> spec:
> selector:
> app: flink
> component: taskmanager
> ports:
> - protocol: TCP
> port: 5
> targetPort: 5
> name: pool
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-jobmanager
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: flink
> component: jobmanager
> template:
> metadata:
> labels:
> app: flink
> component: jobmanager
> spec:
> containers:
> - name: jobmanager
> image: custom-flink:latest
> imagePullPolicy: IfNotPresent
> args: ["jobmanager"]
> ports:
> - containerPort: 6123
> name: rpc
> - containerPort: 6124
> name: blob-server
> - containerPort: 8081
> name: webui
> livenessProbe:
> tcpSocket:
> port: 6123
> initialDelaySeconds: 30
> periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf
> - name: flink-staging
> mountPath: /tmp/beam-artifact-staging
> securityContext:
> runAsUser: 
> resources:
> requests:
> memory: "1Gi"
> cpu: "1"
> limits:
> memory: "1Gi"
> cpu: "1"
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: flink-staging
> persistentVolumeClaim:
> claimName: staging-artifacts-claim
> ---
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-taskmanager
> namespace: flink
> spec:
> replicas: 1
> selector:
> matchLabels:
> app: flink
> component: taskmanager
> template:
> metadata:
> labels:
> app: flink
> component: taskmanager
> spec:
> containers:
> - name: taskmanager-beam-worker
> image: custom-flink:latest
> imagePullPolicy: IfNotPresent
> args:
> - /bin/bash
> - -c
> - 

Re: Beam SQL found limitations

2023-07-10 Thread Kenneth Knowles
It would be interesting to see a design for this. You'll need to partition
or it won't scale because SQL "OVER" clause is linear and sorted in this
case. Other than that, it should be a pretty straightforward implementation
using state + timers + @RequiresTimeSortedInput. Sorting in any other way
would be a little more work, so I'd start with rejecting ORDER BY clauses
with other columns.

Kenn

On Fri, Jun 9, 2023 at 5:06 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> BTW just found this on Calcite:
> https://calcite.apache.org/docs/stream.html#sliding-windows
>
> I think this is precisely what I was trying to do with Beam SQL and the
> syntax is also very intuitive.
>
> Could this be added to SQL roadmap? How hard it is for implementation?
>
> Best
>
> Piotr
> On 31.05.2023 20:29, Kenneth Knowles wrote:
>
> 1. Yes, using state is a better fit than Beam windowing. You will want to
> use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
> This will make it so you can be sure you are actually getting the
> "previous" event. They can arrive in any order without this annotation. You
> won't be able to do this in SQL. I don't think Beam SQL has implementations
> of analytic functions that have this ability.
>
> Kenn
>
> On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
> contact.wisniowskipi...@gmail.com> wrote:
>
>> Hi Kenn,
>>
>> Thanks for clarification.
>>
>> 1. Just to put an example in front - for every event that comes in I need
>> to find corresponding previous event of same user_id and pass
>> previous_event_timestamp in the current event payload down (and also
>> current event becomes previous event for future events that come in for
>> same user). Question is how to do it with BeamSQL. I am aware that analytic
>> windowing (like last_value over etc.) might not be a way for streaming and
>> I am ok with this - it make sense under the hood just as You mention.
>>
>> The task is to be able to keep a simple state in streaming SQL. What I
>> did come up with is using sliding window to have this state available for
>> each new event that comes in.
>>
>> ```
>>
>> WITH
>> unbounded_stream_initialized AS (
>> SELECT
>> user_id,
>> event_time
>> FROM unbounded_stream
>> GROUP BY
>> user_id,
>> event_time,
>> TUMBLE(event_time,INTERVAL '1' SECONDS)
>> UNION ALL
>> -- this is needed as first session window by default starts at first
>> element, while here we need to start it in the past
>> -- so that there is a window that ends just after first real element
>> SELECT
>> CAST(0 AS BIGINT) AS user_id,
>> CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
>> FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not
>> allow to have GROUP BY just after SELECT
>> GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL
>> '1' SECONDS)
>> ),
>> test_data_1 AS (
>> SELECT
>> user_id,
>> MAX(event_time) AS prev_event_time,
>> HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
>> window_end_at
>> FROM unbounded_stream_initialized
>> GROUP BY
>> user_id,
>> HOP(
>> -- first create a sliding window to aggregate state
>> event_time,
>> INTERVAL '1' SECONDS,
>> INTERVAL '7' DAYS -- The idea is to have this quite long
>> compared to interval
>> )
>> ),
>> test_data_1_lookup AS (
>> SELECT
>> user_id,
>> prev_event_time
>> FROM test_data_1
>> GROUP BY
>> user_id,
>> -- then re-window into windows suitable for joining main stream
>> TUMBLE(window_end_at, INTERVAL '1' SECONDS)
>> ),
>> enriched_info AS (
>> SELECT
>> unbounded_stream_initialized.event_timestamp AS event_timestamp,
>> unbounded_stream_initialized.user_id AS user_id,
>> test_data_1_lookup.prev_event_time AS prev_event_time
>> FROM unbounded_stream_initialized
>> LEFT JOIN test_data_1_lookup
>> ON unbounded_stream_initialized.user_id =
>> test_data_1_lookup.user_id
>> )
>> SELECT
>> *
>> FROM enriched_info
>>
>> ```
>>
>> The doubt that I have is whether above will not store too much redundant
>> data as `test_data_1` suggests it could duplicate and store each incoming
>

Re: Beam SQL found limitations

2023-05-31 Thread Kenneth Knowles
1. Yes, using state is a better fit than Beam windowing. You will want to
use the new feature of annotating the DoFn with @RequiresTimeSortedInput.
This will make it so you can be sure you are actually getting the
"previous" event. They can arrive in any order without this annotation. You
won't be able to do this in SQL. I don't think Beam SQL has implementations
of analytic functions that have this ability.

Kenn

On Wed, May 31, 2023 at 4:17 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi Kenn,
>
> Thanks for clarification.
>
> 1. Just to put an example in front - for every event that comes in I need
> to find corresponding previous event of same user_id and pass
> previous_event_timestamp in the current event payload down (and also
> current event becomes previous event for future events that come in for
> same user). Question is how to do it with BeamSQL. I am aware that analytic
> windowing (like last_value over etc.) might not be a way for streaming and
> I am ok with this - it make sense under the hood just as You mention.
>
> The task is to be able to keep a simple state in streaming SQL. What I did
> come up with is using sliding window to have this state available for each
> new event that comes in.
>
> ```
>
> WITH
> unbounded_stream_initialized AS (
> SELECT
> user_id,
> event_time
> FROM unbounded_stream
> GROUP BY
> user_id,
> event_time,
> TUMBLE(event_time,INTERVAL '1' SECONDS)
> UNION ALL
> -- this is needed as first session window by default starts at first
> element, while here we need to start it in the past
> -- so that there is a window that ends just after first real element
> SELECT
> CAST(0 AS BIGINT) AS user_id,
> CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
> FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow
> to have GROUP BY just after SELECT
> GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1'
> SECONDS)
> ),
> test_data_1 AS (
> SELECT
> user_id,
> MAX(event_time) AS prev_event_time,
> HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS
> window_end_at
> FROM unbounded_stream_initialized
> GROUP BY
> user_id,
> HOP(
> -- first create a sliding window to aggregate state
> event_time,
> INTERVAL '1' SECONDS,
> INTERVAL '7' DAYS -- The idea is to have this quite long
> compared to interval
> )
> ),
> test_data_1_lookup AS (
> SELECT
> user_id,
> prev_event_time
> FROM test_data_1
> GROUP BY
> user_id,
> -- then re-window into windows suitable for joining main stream
> TUMBLE(window_end_at, INTERVAL '1' SECONDS)
> ),
> enriched_info AS (
> SELECT
> unbounded_stream_initialized.event_timestamp AS event_timestamp,
> unbounded_stream_initialized.user_id AS user_id,
> test_data_1_lookup.prev_event_time AS prev_event_time
> FROM unbounded_stream_initialized
> LEFT JOIN test_data_1_lookup
> ON unbounded_stream_initialized.user_id =
> test_data_1_lookup.user_id
> )
> SELECT
> *
> FROM enriched_info
>
> ```
>
> The doubt that I have is whether above will not store too much redundant
> data as `test_data_1` suggests it could duplicate and store each incoming
> msg into all windows there are in the sliding window definition (might be a
> lot in this case). Or actually resolving if a message belongs to a window
> is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still
> learning Beam so there might be some core thing that I miss to understand
> how it is processed.
>
> 2. Any hints on implementing FirestoreIOTableProvider? just more or less
> how to do it where to look for important parts etc. It seems we would need
> this functionality.
>
> 3. I will try to report some more interesting findings. If possible please
> prioritize fixing this ROW error.
>
> Best
>
> Piotr
>
> On 26.05.2023 21:36, Kenneth Knowles wrote:
>
> Just want to clarify that Beam's concept of windowing is really an
> event-time based key, and they are all processed logically simultaneously.
> SQL's concept of windowing function is to sort rows and process them
> linearly. They are actually totally different. From your queries it seems
> you are interested in SQL's windowing functions (aka analytic functions).
>
> I am surprised by the problems with rows, since we have used them
> extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
> problem.
>
>

Re: Beam SQL found limitations

2023-05-26 Thread Kenneth Knowles
Just want to clarify that Beam's concept of windowing is really an
event-time based key, and they are all processed logically simultaneously.
SQL's concept of windowing function is to sort rows and process them
linearly. They are actually totally different. From your queries it seems
you are interested in SQL's windowing functions (aka analytic functions).

I am surprised by the problems with rows, since we have used them
extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
problem.

And for the CROSS JOIN it would be a nice feature to allow in some cases it
seems. Should not be hard.

Thank you for reporting this! If you have time it would be really great to
get each of these reproducible problems into GitHub issues, each.

Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi Alexey,
>
> Thank You for reference to that discussion I do actually have pretty
> similar thoughts on what Beam SQL needs.
>
> Update from my side:
>
> Actually did find a workaround for issue with windowing function on
> stream. It basically boils down to using sliding window to collect and
> aggregate the state. But would need an advice if this is actually a cost
> efficient method (targeting DataFlow runner). The doubt that I have is that
> this sliding window would need to have sliding interval less than 1s and
> size more than a week and be feed with quire frequent data. If I do
> understand this correctly - it would mean each input row would need to be
> duplicated for each window and stored which could be quite significant
> storage cost?
>
> Or actually Beam does not physically duplicate the record but just tracks
> to which windows the record currently belongs?
>
>
> And the real issue that BeamSQL needs at the moment in my opinion is
> fixing bugs.
>
> Some bugs that I found that prevent one from using it and would really
> appreciate fast fix:
>
> - UNNEST ARRAY with a nested ROW (described below, created ticket -
> https://github.com/apache/beam/issues/26911)
>
> - PubSub table provider actually requires all table properties to be there
> (with null in `timestampAttributeKey` it fails) - which essentially does
> not allow one to use pubsub publish timestamp as `timestampAttributeKey`.
>
> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
> DataStoreV1TableProvider to provide a key for storage. Also consider
> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
> requires VARCHAR instead of BYTES - its even easier in implementation.
>
> - Any hints on how to implement `FireStoreIOTableProvider`? I am
> considering implementing it and contributing depending on my team decision
> - but would like to get like idea how hard this task is.
>
> Will create tickets for the rest of issues when I will have some spare
> time.
>
> Best regards
>
> Wiśniowski Piotr
>
>
> On 22.05.2023 18:28, Alexey Romanenko wrote:
>
> Hi Piotr,
>
> Thanks for details! I cross-post this to dev@ as well since, I guess,
> people there can provide more insights on this.
>
> A while ago, I faced the similar issues trying to run Beam SQL against
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can be
> helpful.
>
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>
> —
> Alexey
>
> On 18 May 2023, at 11:36, Wiśniowski Piotr
>  
> wrote:
>
> HI,
>
> After experimenting with Beam SQL I did find some limitations. Testing on
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
> with Calcite, direct runner and openjdk version "11.0.19". Please let me
> know if some of them are known/ worked on/ have tickets or have estimated
> fix time. I believe most of them are low hanging fruits or just my thinking
> is not right for the problem. If this is the case please guide me to some
> working solution.
>
>  From my perspective it is ok to have a fix just on master - no need to
> wait for release. Priority order:
> - 7. Windowing function on a stream - in detail - How to get previous
> message for a key? setting expiration arbitrary big is ok, but access to
> the previous record must happen fairly quickly not wait for the big window
> to finish and emit the expired keys. Ideally would like to do it in pure
> beam pipeline as saving to some external key/value store and then reading
> this here could potentially result in some race conditions which in I would
> like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
> one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
>
> Below queries tat I use to testing this scenarios.
>
> Thank You for looking at this topics!
>
> Best
>
> Wiśniowski Piotr
> ---
> -- 1. `CROSS 

Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-14 Thread Kenneth Knowles
SGTM. I asked on the PR if this could impact users, but having read the
docker release calendar I am not concerned. The last update to the old
version was in 2019, and the introduction of compatible versions was 2020.

On Tue, Feb 14, 2023 at 3:01 PM Byron Ellis via user 
wrote:

> FWIW I am Team Upgrade Docker :-)
>
> On Tue, Feb 14, 2023 at 2:53 PM Luke Cwik via user 
> wrote:
>
>> I made some progress in testing the container and did hit an issue where
>> Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
>> turns out that our boot.go crashes with "runtime/cgo: pthread_create
>> failed: Operation not permitted" because the Ubuntu 22.04 is using new
>> syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
>> default of deny). We have a couple of choices here:
>> 1) upgrade the version of docker on Jenkins and require users to
>> similarly use a new enough version of Docker so that this isn't an issue
>> for them
>> 2) use Ubuntu 20.04 "Focal" as the docker container
>>
>> I was using Docker 20.10.21 which is why I didn't hit this issue when
>> testing the change locally.
>>
>> We could also do these but they same strictly worse then either of the
>> two options discussed above:
>> A) disable the seccomp policy on Jenkins
>> B) use a custom seccomp policy on Jenkins
>>
>> My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu
>> 22.04 as it will have LTS releases till 2027 and then security patches till
>> 2032 which gives everyone the longest runway till we need to swap OS
>> versions again for users of Apache Beam. Any concerns or ideas?
>>
>>
>>
>> On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:
>>
>>> Our current container java 8 container is 262 MiBs and layers on top of
>>> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
>>> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>>>
>>> I would rather not get into issues with C library differences caused by
>>> the alpine project so I would stick with the safer option and let users
>>> choose alpine when building their custom container if they feel it provides
>>> a large win for them. We can always swap to alpine in the future as well if
>>> the C library differences become a non-issue.
>>>
>>> So swapping to eclipse-temurin will save us a bunch on the container
>>> size which should help with container transfer and hopefully for startup
>>> times as well.
>>>
>>> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud 
>>> wrote:
>>>
 This sounds reasonable to me as well.

 I've made swaps like this in the past, the base image of each is
 probably a bigger factor than the JDK. The openjdk images were based on
 Debian 11. The default eclipse-temurin images are based on Ubuntu 22.04
 with an alpine option. Ubuntu is a Debian derivative but the versions and
 package names aren't exact matches and Ubuntu tends to update a little
 faster. For most users I don't think this will matter but users building
 custom containers may need to make minor changes. The alpine option will be
 much smaller (which could be a significant improvement) but would be a more
 significant change to the environment.

 On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
 d...@beam.apache.org> wrote:

> Seams reasonable to me.
>
> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user <
> user@beam.apache.org> wrote:
> >
> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
> stopped being built and supported since July 2022. I have filed [2] to
> track the resolution of this issue.
> >
> > Based upon [1], almost everyone is swapping to the eclipse-temurin
> container[3] as their base based upon the linked issues from the
> deprecation notice[1]. The eclipse-temurin container is released under
> these licenses:
> > Apache License, Version 2.0
> > Eclipse Distribution License 1.0 (BSD)
> > Eclipse Public License 2.0
> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
> Assembly Exception
> > 一 (Secondary) GNU General Public License, version 2 with the GNU
> Classpath Exception
> >
> > I propose that we swap all our containers to the eclipse-temurin
> containers[3].
> >
> > Open to other ideas and also would be great to hear about your
> experience in any other projects that you have had to make a similar
> decision.
> >
> > 1: https://github.com/docker-library/openjdk/issues/505
> > 2: https://github.com/apache/beam/issues/25371
> > 3: https://hub.docker.com/_/eclipse-temurin
>



Re: Beam SQL Alias issue while using With Clause

2023-01-23 Thread Kenneth Knowles
Looking at the code that turns a logical CalcRel into a BeamCalcRel I do
not see any obvious cause for this:
https://github.com/apache/beam/blob/b3aa2e89489898f8c760294ba4dba2310ac53e70/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java#L69

I don't like to guess that upstream libraries have the bug, but in this
case I wonder if the alias is lost in the Calcite optimizer rule for
merging the projects and filters into a Calc.

Kenn

On Mon, Jan 23, 2023 at 10:13 AM Kenneth Knowles  wrote:

> I am not sure I understand the question, but I do see an issue.
>
> Context: "CalcRel" is an optimized relational operation that is somewhat
> like ParDo, with a small snippet of a single-assignment DSL embedded in it.
> Calcite will choose to merge all the projects and filters into the node,
> and then generates Java bytecode to directly execute the DSL.
>
> Problem: it looks like the CalcRel has output columns with aliases "id"
> and "v" where it should have output columns with aliases "id" and "value".
>
> Kenn
>
> On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:
>
>> Adding: @Andrew Pilloud  @Kenneth Knowles
>> 
>>
>> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
>> user@beam.apache.org> wrote:
>>
>>> Hi All,
>>>
>>> I am using Beam 2.43 with Calcite SQL with Java.
>>>
>>> I have a query with a WITH clause and some aliasing. Looks like Beam
>>> Query optimizer after optimizing my query, it drops Select statement's
>>> aliases. Can you help me to identify where the problem is ?
>>>
>>> This is my query
>>> INFO: SQL:
>>> WITH `tempTable` (`id`, `v`) AS (SELECT
>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
>>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id` AS
>>> `id`, `tempTable`.`v` AS `value`
>>> FROM `tempTable` AS `tempTable`
>>> WHERE `tempTable`.`v` <> '11')
>>>
>>> This is Calcite Plan look at LogicalProject(id=[$0], value=[$1]) in SQL
>>> plan.
>>>
>>> Jan 12, 2023 12:19:08 PM
>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
>>> INFO: SQLPlan>
>>> LogicalProject(id=[$0], value=[$1])
>>>   LogicalFilter(condition=[<>($1, '11')])
>>> LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString])
>>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>
>>> But Beam Plan does not have a LogicalProject(id=[$0], value=[$1]) or
>>> similar.
>>>
>>> Jan 12, 2023 12:19:08 PM
>>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
>>> INFO: BEAMPlan>
>>> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t1.f_nestedInt],
>>> expr#3=[$t1.f_nestedString], expr#4=['11':VARCHAR], expr#5=[<>($t3, $t4)],
>>> id=[$t2], v=[$t3], $condition=[$t5])
>>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>>
>>>
>>> Thanks
>>>
>>


Re: Beam SQL Alias issue while using With Clause

2023-01-23 Thread Kenneth Knowles
I am not sure I understand the question, but I do see an issue.

Context: "CalcRel" is an optimized relational operation that is somewhat
like ParDo, with a small snippet of a single-assignment DSL embedded in it.
Calcite will choose to merge all the projects and filters into the node,
and then generates Java bytecode to directly execute the DSL.

Problem: it looks like the CalcRel has output columns with aliases "id" and
"v" where it should have output columns with aliases "id" and "value".

Kenn

On Thu, Jan 19, 2023 at 6:01 PM Ahmet Altay  wrote:

> Adding: @Andrew Pilloud  @Kenneth Knowles
> 
>
> On Thu, Jan 12, 2023 at 12:31 PM Talat Uyarer via user <
> user@beam.apache.org> wrote:
>
>> Hi All,
>>
>> I am using Beam 2.43 with Calcite SQL with Java.
>>
>> I have a query with a WITH clause and some aliasing. Looks like Beam
>> Query optimizer after optimizing my query, it drops Select statement's
>> aliases. Can you help me to identify where the problem is ?
>>
>> This is my query
>> INFO: SQL:
>> WITH `tempTable` (`id`, `v`) AS (SELECT
>> `PCOLLECTION`.`f_nestedRow`.`f_nestedInt` AS `id`,
>> `PCOLLECTION`.`f_nestedRow`.`f_nestedString` AS `v`
>> FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`) (SELECT `tempTable`.`id` AS
>> `id`, `tempTable`.`v` AS `value`
>> FROM `tempTable` AS `tempTable`
>> WHERE `tempTable`.`v` <> '11')
>>
>> This is Calcite Plan look at LogicalProject(id=[$0], value=[$1]) in SQL
>> plan.
>>
>> Jan 12, 2023 12:19:08 PM
>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
>> INFO: SQLPlan>
>> LogicalProject(id=[$0], value=[$1])
>>   LogicalFilter(condition=[<>($1, '11')])
>> LogicalProject(id=[$1.f_nestedInt], v=[$1.f_nestedString])
>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>
>> But Beam Plan does not have a LogicalProject(id=[$0], value=[$1]) or
>> similar.
>>
>> Jan 12, 2023 12:19:08 PM
>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
>> INFO: BEAMPlan>
>> BeamCalcRel(expr#0..1=[{inputs}], expr#2=[$t1.f_nestedInt],
>> expr#3=[$t1.f_nestedString], expr#4=['11':VARCHAR], expr#5=[<>($t3, $t4)],
>> id=[$t2], v=[$t3], $condition=[$t5])
>>   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
>>
>>
>> Thanks
>>
>


Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-29 Thread Kenneth Knowles
Hi Will, David,

I think you'll find the best source of answer for this sort of question on
the user@beam list. I've put that in the To: line with a BCC: to the
dev@beam list so everyone knows they can find the thread there. If I have
misunderstood, and your question has to do with building Beam itself, feel
free to move it back.

Kenn

On Mon, Aug 29, 2022 at 2:24 PM Will Baker  wrote:

> Hello!
>
> I am wondering about using checkpoints with Beam running on Google
> Cloud Dataflow.
>
> The docs indicate that checkpoints are not supported by Google Cloud
> Dataflow:
> https://beam.apache.org/documentation/runners/capability-matrix/additional-common-features-not-yet-part-of-the-beam-model/
>
> Is there a recommended approach to handling checkpointing on Google
> Cloud Dataflow when using streaming sources like Kinesis and Kafka, so
> that a pipeline could be resumed from where it left off if it needs to
> be stopped or crashes for some reason?
>
> Thanks!
> Will Baker
>


Re: Bazel based build

2022-06-09 Thread Kenneth Knowles
A big reason we chose Gradle over Bazel was that we had a hackathon to
implement both, and only one person* chose to work on Bazel while the
Gradle team attracted many contributors. Having a build system with more
widespread knowledge and interest is an important consideration. Basically
I suggest we keep the project as "typical" as possible for each language
ecosystem (hence using pip, pytest, tox, etc, go test, docker, etc for
different modules), while meeting the requirements of having incremental &
caching language-agnostic builds (hence using something like bazel, gradle,
make, etc, for the overall build orchestration).

Kenn

*that one person was me!

On Mon, Jun 6, 2022 at 9:31 AM Andrew Pilloud  wrote:

> Also I think this belongs on the dev@ list?
>
> On Mon, Jun 6, 2022 at 9:30 AM Andrew Pilloud  wrote:
>
>> I added Bazel support for building and releasing Java to the ZetaSQL
>> repository (based on existing Google internal Blaze configs). From the Java
>> side, migrating the build and test is the easy part. Bazel is lacking tools
>> to build, test, and ship maven release artifacts for Java. This is
>> supported by a gradle plugin today. The scripts that handle this for
>> ZetaSQL aren't public as they are brittle shell scripts that break with
>> every release. I don't think that approach would scale to Beam.
>>
>> I would be hesitant to accept changes adding BUILD files without deciding
>> we want to cut over to Bazel. Maintaining two build systems is difficult
>> and a bazel config that isn't run in precommits would quickly break.
>>
>> Andrew
>>
>> On Mon, Jun 6, 2022 at 9:06 AM Robert Bradshaw 
>> wrote:
>>
>>> I think it would be useful to see how far bazel has gotten, and it
>>> certainly would work better for other languages than java (and
>>> cross-language), but I would be careful not to underestimate how large
>>> of an undertaking this might be (both technically, and convincing
>>> everyone it's a good idea to switch again).
>>>
>>> Regardless, it would be good to see how much our build system could be
>>> streamlined and simplified. Moving stuff out of the build system is
>>> probably step one on a project like this.
>>>
>>> On Mon, Jun 6, 2022 at 12:54 AM Reuven Lax  wrote:
>>> >
>>> > It might be a difficult project!  We looked into Bazel some years ago,
>>> and it didn't quite work out for Beam's needs (though Bazel has presumably
>>> advanced in the interim). Creating Gradle build rules for Beam (migrating
>>> away from Maven) was an unexpectedly large project - it took multiple
>>> engineers a year to complete, and the build has only gotten larger since
>>> then.
>>> >
>>> > Reuven
>>> >
>>> > On Sun, Jun 5, 2022 at 6:26 PM Red Daly  wrote:
>>> >>
>>> >> Hi Beam authors,
>>> >>
>>> >> Would the project accept github pull requests that add Bazel building
>>> functionality? I have been struggling trying to understand how to do some
>>> basic things[1] in gradle and might want to just try a build system with
>>> which I'm familiar. I expect it might also make it easier to write
>>> cross-language and integration tests, though you're probably already
>>> achieving that somehow.
>>> >>
>>> >> Thanks,
>>> >> Red
>>> >>
>>> >> [1] I was trying to build make changes to both the Beam Flink runner
>>> and Flink itself but couldn't figure out how to do that.
>>>
>>


Re: Potential Bug: Beam + Flink + AutoValue Builders

2021-11-17 Thread Kenneth Knowles
Noticing that you had another question on this thread. If I understand
correctly, the answer to your question is that Beam converting objects
to/from Row uses bytecode generation for performance, and automatically
figures out how to map the columns of the Row to builder methods. But I may
be misunderstanding which part of SchemaCoder is involved here. Mostly just
bumping this thread.

Kenn

On Tue, Oct 26, 2021 at 11:29 AM Cristian Constantinescu 
wrote:

> Hi Reuven,
>
> Thanks for the quick reply.
>
> Could you elaborate why Beam needs to create the Builder dynamically
> through reflection (basically using reflection to create an instance of a
> package private class)? As far as AutoValue goes, it looks like an
> anti-pattern to try to create an instance of the generated builder by
> calling the AutoValue generated class (AutoValue_App_FooAutoValue in this
> case). I think that normally, the only place that can call the auto
> generated builder constructor is from the user code abstract class
> (FooAutoValue) through:
>
> public static Builder builder() {
> return new AutoValue_App_FooAutoValue.Builder();
> }
>
> In fact, this method is directly called when using the @SchemaCreate
> method, regardless if the create method is called through reflection or
> not. I guess what I'm asking is, could beam not call the
> FooAutoValue.builder() dynamically if directly is not possible?
>
>
> On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax  wrote:
>
>> Beam needs to create these elements dynamically. when decoding records,
>> so it can't easily call the builder directly.
>>
>> My first guess is that there's a classloader issue here. Flink does some
>> fancy classloader munging, and that might be breaking an assumption in this
>> code. Passing in the correct classloader should hopefully fix this.
>>
>> Reuven
>>
>>
>> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu <
>> zei...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> Not sure if anyone is using Beam with the Flink Runner and AutoValue
>>> builders. For me, it doesn't work. I have some questions and a workaround
>>> for anyone in the same boat.
>>>
>>> Beam 2.31, Flink 1.13, AutoValue 1.8.2
>>>
>>> Here's the code:
>>>
>>> package org.whatever.testing;
>>>
>>> import com.google.auto.value.AutoValue;
>>> import org.apache.beam.sdk.Pipeline;
>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>> import org.apache.beam.sdk.schemas.AutoValueSchema;
>>> import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
>>> import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
>>> import org.apache.beam.sdk.schemas.transforms.Convert;
>>> import org.apache.beam.sdk.transforms.Create;
>>> import org.apache.beam.sdk.transforms.MapElements;
>>> import org.apache.beam.sdk.values.TypeDescriptor;
>>>
>>> import java.util.Arrays;
>>>
>>> public class App {
>>>
>>> public static void main(String[] args) {
>>> var options = 
>>> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>>>
>>> var p = Pipeline.create(options);
>>> p
>>> 
>>> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build(
>>> .apply(Convert.to(FooAutoValue.class))
>>> 
>>> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> {
>>> System.out.println(i.toString());
>>> return i;
>>> }))
>>> ;
>>> p.run().waitUntilFinish();
>>> }
>>> @AutoValue
>>> @DefaultSchema(AutoValueSchema.class)
>>> public static abstract class FooAutoValue {
>>> public abstract String getDummyProp();
>>>
>>> //@SchemaCreate
>>> //public static FooAutoValue create(String dummyProp) {
>>> //return builder()
>>> //.setDummyProp(dummyProp)
>>> //.build();
>>> //}
>>>
>>> public static Builder builder() {
>>> return new AutoValue_App_FooAutoValue.Builder();
>>> }
>>>
>>> @AutoValue.Builder
>>> public abstract static class Builder {
>>> public abstract Builder setDummyProp(String newDummyProp);
>>>
>>> public abstract FooAutoValue build();
>>> }
>>> }
>>> }
>>>
>>> Note that it doesn't matter if FooAutoValue is an inner class or in its
>>> own file as a top level non static class. For simplicity here I'm
>>> converting the objects to the same class, in prod code the input is of
>>> another type with equivalent schema.
>>>
>>> And the stack trace:
>>>
>>> Caused by: java.lang.IllegalAccessError: failed to access class
>>> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
>>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
>>> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
>>> module of loader 'app';
>>> 

Re: Apache Beam Go SDK Quickstart bugs

2021-11-08 Thread Kenneth Knowles
Awesome! Just going to add a few colleagues (who are subscribed anyhow) to
make sure this hits the top of their inbox.

+Robert Burke  +Chamikara Jayalath
 +Kyle
Weaver 

Kenn

On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason  wrote:

> I'm interested to see the Go SDK work with the Spark runner. Based on the
> instructions at https://beam.apache.org/get-started/quickstart-go/, I run
> these commands and get the following failure:
>
> $ ./gradlew :runners:spark:2:job-server:runShadow
> in another window:
> $ cd sdks
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark --endpoint localhost:8099
> 2021/11/04 22:06:46 No environment config specified. Using default config:
> 'apache/beam_go_sdk:2.35.0.dev'
> 2021/11/04 22:06:46 Failed to execute job:  generating model pipeline
> failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
> 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
> {5: string/string GLO}]}] [0xc96cd0]}
> caused by:
> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
> caused by:
> failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
> -> [Out: string -> {5: string/string GLO}]
> caused by:
> encoding userfn 5: ParDo [In(Main): string <- {4: string/string
> GLO}] -> [Out: string -> {5: string/string GLO}]
> bad userfn
> caused by:
> encoding structural DoFn &{ 0xc000460ae8 
> map[ProcessElement:0xc0004fcac0] map[]}
> receiver type *main.extractFn must be registered
> exit status 1
>
> I was able to register that type, like this:
>
> diff --git a/sdks/go/examples/wordcount/wordcount.go
> b/sdks/go/examples/wordcount/wordcount.go
> index 4d54db9a2d..6db99d6220 100644
> --- a/sdks/go/examples/wordcount/wordcount.go
> +++ b/sdks/go/examples/wordcount/wordcount.go
> @@ -60,6 +60,7 @@ import (
> "flag"
> "fmt"
> "log"
> +   "reflect"
> "regexp"
> "strings"
>
> @@ -107,6 +108,7 @@ var (
>  // by calling beam.RegisterFunction in an init() call.
>  func init() {
> beam.RegisterFunction(formatFn)
> +   beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>  }
>
>  var (
>
>
> Then I encountered:
>
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
> class file major version 55
> 2021/11/04 23:07:26 Job state: FAILED
> 2021/11/04 23:07:26 Failed to execute job: job
> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
> failed
> exit status 1
>
>
> Switching to the Spark 3.0 job server changed things:
> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
> ...
> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
> foo --runner spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:12:04 Staged binary artifact with token:
> 2021/11/04 23:12:04 Submitted job:
> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
> 2021/11/04 23:12:04 Job state: STOPPED
> 2021/11/04 23:12:04 Job state: STARTING
> 2021/11/04 23:12:04 Job state: RUNNING
> 2021/11/04 23:12:17 Job state: DONE
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>  

Re: How to read data from bigtable

2021-07-12 Thread Kenneth Knowles
Hi Raja & all,

I'm moving the thread to user@beam.apache.org since I think many users may
be interested in the answer to the question, or have other experiences to
share.

Kenn

On Mon, Jul 12, 2021 at 12:58 PM Chamikara Jayalath 
wrote:

> There's also an external connector that us directly supported by the
> BigTable team: https://cloud.google.com/bigtable/docs/hbase-dataflow-java
>
> Thanks,
> Cham
>
> On Mon, Jul 12, 2021 at 12:04 PM Andrew Pilloud 
> wrote:
>
>> Yes, the BigTable IO:
>>
>> https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.html
>>
>> Are you using Java or Python?
>>
>> On Mon, Jul 12, 2021 at 12:01 PM Raja Guda  wrote:
>> >
>> > Hi,
>> >
>> > Is there any IO connector to read data from bigtable.
>> >
>> > Thanks,
>> > Raja
>>
>


Re: Help: Apache Beam Session Window with limit on number of events and time elapsed from window start

2021-07-07 Thread Kenneth Knowles
Hi Chandan,

I am moving this thread to user@beam.apache.org. I think that is the best
place to discuss.

Kenn

On Wed, Jul 7, 2021 at 9:32 AM Chandan Bhattad 
wrote:

> Hi Team,
>
> Hope you are doing well.
>
> I have a use case around session windowing with some customizations.
>
> We need to have create user sessions based on *any *of the 3 conditions
> below
>
> 1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no
> event for 30 minutes for a user)
> 2. Number of events in the session window reaches 20,000
> 3. 4 hours have elapsed since window start
>
> Below is what I have tried.
>
> beam.WindowInto(window.Sessions(session_timeout_seconds),
> trigger=trigger.Repeatedly(
> trigger.AfterAny(
> trigger.AfterCount(2),
> trigger.DefaultTrigger(),
> TriggerWhenWindowStartPassesXHours(hours=0.2)
> )
> ),
> timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW,
> accumulation_mode=trigger.AccumulationMode.DISCARDING
> )
>
>
> # Custom Trigger Implementation
> from apache_beam.transforms.trigger import DefaultTrigger
> from apache_beam.utils.timestamp import Timestamp
>
>
> class TriggerWhenWindowStartPassesXHours(DefaultTrigger):
>
> def __init__(self, hours=4):
> super(TriggerWhenWindowStartPassesXHours, self).__init__()
> self.hours = hours
>
> def __repr__(self):
> return 'TriggerWhenWindowStartPassesXHours()'
>
> def should_fire(self, time_domain, watermark, window, context):
> should_fire = (Timestamp.now() - window.start).micros >= 36 * 
> self.hours
> return should_fire
>
> @staticmethod
> def from_runner_api(proto, context):
> return TriggerWhenWindowStartPassesXHours()
>
> The above works well, but there is an issue. Whenever Trigger No. 3 above
> fires -- it does not create a new session window, but the same window is
> continued.
> What happens due to this is, the No. 3 would keep on firing on every
> subsequent after 4 hours since window start, since should_fire condition is:
>
> should_fire = (Timestamp.now() - window.start).micros >= 36 * 
> self.hours
>
> and since window.start is never updated after the first time trigger is
> fired - it will fire for every subsequent event after the first trigger.
>
> I have also posted this on stackoverflow:
> https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events
>
> I would be very grateful for any help as to how to achieve this.
> Thanks a lot in advance.
>
> Regards,
> Chandan
>


Re: No data sinks have been created yet.

2021-06-10 Thread Kenneth Knowles
Beam doesn't use Flink's sink API. I recall from a very long time ago that
we attached a noop sink to each PCollection to avoid this error. +Kyle
Weaver  might know something about how this applies to
Python on Flink.

Kenn

On Wed, Jun 9, 2021 at 4:41 PM Trevor Kramer 
wrote:

> Hello Beam community,
>
> I am getting the following error running a Beam pipeline on Flink.
>
> RuntimeError: Pipeline BeamApp failed in state FAILED:
> java.lang.RuntimeException: No data sinks have been created yet. A program
> needs at least one sink that consumes data. Examples are writing the data
> set or printing it.
>
> Here is my pipeline which I believe has a sink at the end of it. What am
> I missing?
>
> with beam.Pipeline(options=options) as p:
> (p
>  | 'Read SDF' >> ParseSDF('s3://some-path.sdf')
>  | 'Sample' >> beam.combiners.Sample.FixedSizeGlobally(1000)
>  | 'Flatten' >> beam.FlatMap(lambda x: x)
>  | 'Standardize' >> beam.Map(standardize)
>  | 'Make FPs' >> beam.Map(calculate_fps)
>  | 'Make Dict' >> beam.Map(lambda x: {'fp': x})
>  | 'Write Parquet' >> WriteToParquet('s3://some-path', pyarrow.schema(
> [('fp', pyarrow.list_(pyarrow.int64(), 2048))]
> ))
>  )
>
>
> Thanks,
>
>
> Trevor
>
>


Re: Allyship workshops for open source contributors

2021-06-07 Thread Kenneth Knowles
Yes please!

On Thu, Jun 3, 2021, 18:32 Ratnakar Malla  wrote:

> +1
>
>
> --
> *From:* Austin Bennett 
> *Sent:* Thursday, June 3, 2021 6:20:25 PM
> *To:* user@beam.apache.org 
> *Cc:* dev 
> *Subject:* Re: Allyship workshops for open source contributors
>
> +1, assuming timing can work.
>
> On Wed, Jun 2, 2021 at 2:07 PM Aizhamal Nurmamat kyzy 
> wrote:
>
> If we have a good number of people who express interest in this thread, I
> will set up training for the Airflow community.
>
>
> I meant Beam ^^' I am organizing it for the Airflow community as well.
>
>


Re: RenameFields behaves differently in DirectRunner

2021-06-03 Thread Kenneth Knowles
I still don't quite grok the details of how this succeeds or fails in
different situations. The invalid row succeeds in serialization because the
coder is not sensitive to the way in which it is invalid?

Kenn

On Wed, Jun 2, 2021 at 2:54 PM Brian Hulette  wrote:

> > One thing that's been on the back burner for a long time is making
> CoderProperties into a CoderTester like Guava's EqualityTester.
>
> Reuven's point still applies here though. This issue is not due to a bug
> in SchemaCoder, it's a problem with the Row we gave SchemaCoder to encode.
> I'm assuming a CoderTester would require manually generating inputs right?
> These input Rows represent an illegal state that we wouldn't test with.
> (That being said I like the idea of a CoderTester in general)
>
> Brian
>
> On Wed, Jun 2, 2021 at 12:11 PM Kenneth Knowles  wrote:
>
>> Mutability checking might catch that.
>>
>> I meant to suggest not putting the check in the pipeline, but offering a
>> testing discipline that will catch such issues. One thing that's been on
>> the back burner for a long time is making CoderProperties into a
>> CoderTester like Guava's EqualityTester. Then it can run through all the
>> properties without a user setting up test suites. Downside is that the test
>> failure signal gets aggregated.
>>
>> Kenn
>>
>> On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette 
>> wrote:
>>
>>> Could the DirectRunner just do an equality check whenever it does an
>>> encode/decode? It sounds like it's already effectively performing
>>> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
>>> the equality check.
>>>
>>> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>>>
>>>> There is no bug in the Coder itself, so that wouldn't catch it. We
>>>> could insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo,
>>>> but if the Direct runner already does an encode/decode before that ParDo,
>>>> then that would have fixed the problem before we could see it.
>>>>
>>>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> Would it be caught by CoderProperties?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>>>>
>>>>>> I don't think this bug is schema specific - we created a Java object
>>>>>> that is inconsistent with its encoded form, which could happen to any
>>>>>> transform.
>>>>>>
>>>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>>>> makes it hard to test using PAssert, as I believe that puts everything 
>>>>>> in a
>>>>>> side input, forcing an encoding/decoding.
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>>>>> wrote:
>>>>>>
>>>>>>> +dev 
>>>>>>>
>>>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>>>> fixes the object.
>>>>>>>
>>>>>>> Do we need better testing of schema-aware (and potentially other
>>>>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I have some other work-related things I need to do this week, so I
>>>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>>>> explanation.  It makes perfect sense now.
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>>>>>>> this case) Java Row objects that are inconsistent with the actual 
>>>>>>>>> schema.
>>>>>>>>> For example if you have the following schema:
>>>>>>>>>
>>>>>>>>> Row {
>>>>>>>>>field1: Row {
>>>>>>>>>   field2: string
>>>>>>>>> }
>>>&

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Kenneth Knowles
Mutability checking might catch that.

I meant to suggest not putting the check in the pipeline, but offering a
testing discipline that will catch such issues. One thing that's been on
the back burner for a long time is making CoderProperties into a
CoderTester like Guava's EqualityTester. Then it can run through all the
properties without a user setting up test suites. Downside is that the test
failure signal gets aggregated.

Kenn

On Wed, Jun 2, 2021 at 12:09 PM Brian Hulette  wrote:

> Could the DirectRunner just do an equality check whenever it does an
> encode/decode? It sounds like it's already effectively performing
> a CoderProperties.coderDecodeEncodeEqual for every element, just omitting
> the equality check.
>
> On Wed, Jun 2, 2021 at 12:04 PM Reuven Lax  wrote:
>
>> There is no bug in the Coder itself, so that wouldn't catch it. We could
>> insert CoderProperties.coderDecodeEncodeEqual in a subsequent ParDo, but if
>> the Direct runner already does an encode/decode before that ParDo, then
>> that would have fixed the problem before we could see it.
>>
>> On Wed, Jun 2, 2021 at 11:53 AM Kenneth Knowles  wrote:
>>
>>> Would it be caught by CoderProperties?
>>>
>>> Kenn
>>>
>>> On Wed, Jun 2, 2021 at 8:16 AM Reuven Lax  wrote:
>>>
>>>> I don't think this bug is schema specific - we created a Java object
>>>> that is inconsistent with its encoded form, which could happen to any
>>>> transform.
>>>>
>>>> This does seem to be a gap in DirectRunner testing though. It also
>>>> makes it hard to test using PAssert, as I believe that puts everything in a
>>>> side input, forcing an encoding/decoding.
>>>>
>>>> On Wed, Jun 2, 2021 at 8:12 AM Brian Hulette 
>>>> wrote:
>>>>
>>>>> +dev 
>>>>>
>>>>> > I bet the DirectRunner is encoding and decoding in between, which
>>>>> fixes the object.
>>>>>
>>>>> Do we need better testing of schema-aware (and potentially other
>>>>> built-in) transforms in the face of fusion to root out issues like this?
>>>>>
>>>>> Brian
>>>>>
>>>>> On Wed, Jun 2, 2021 at 5:13 AM Matthew Ouyang <
>>>>> matthew.ouy...@gmail.com> wrote:
>>>>>
>>>>>> I have some other work-related things I need to do this week, so I
>>>>>> will likely report back on this over the weekend.  Thank you for the
>>>>>> explanation.  It makes perfect sense now.
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 11:18 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Some more context - the problem is that RenameFields outputs (in
>>>>>>> this case) Java Row objects that are inconsistent with the actual 
>>>>>>> schema.
>>>>>>> For example if you have the following schema:
>>>>>>>
>>>>>>> Row {
>>>>>>>field1: Row {
>>>>>>>   field2: string
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> And rename field1.field2 -> renamed, you'll get the following schema
>>>>>>>
>>>>>>> Row {
>>>>>>>   field1: Row {
>>>>>>>  renamed: string
>>>>>>>}
>>>>>>> }
>>>>>>>
>>>>>>> However the Java object for the _nested_ row will return the old
>>>>>>> schema if getSchema() is called on it. This is because we only update 
>>>>>>> the
>>>>>>> schema on the top-level row.
>>>>>>>
>>>>>>> I think this explains why your test works in the direct runner. If
>>>>>>> the row ever goes through an encode/decode path, it will come back 
>>>>>>> correct.
>>>>>>> The original incorrect Java objects are no longer around, and new
>>>>>>> (consistent) objects are constructed from the raw data and the 
>>>>>>> PCollection
>>>>>>> schema. Dataflow tends to fuse ParDos together, so the following ParDo 
>>>>>>> will
>>>>>>> see the incorrect Row object. I bet the DirectRunner is encoding and
>>>>>>> decoding in between, which fixes the object.
>>>>>>>
>>>>>>> You can vali

Re: RenameFields behaves differently in DirectRunner

2021-06-02 Thread Kenneth Knowles
ecause it's
>>>>>>   part of some generic tooling has 4 levels of nesting and also 
>>>>>> produces the
>>>>>>   correct output too.
>>>>>>   - BigQueryUtils.toTableRow(Row) returns the expected TableRow
>>>>>>   in DirectRunner.  In DataflowRunner however, only the top-level 
>>>>>> renames
>>>>>>   were reflected in the TableRow and all renames in the nested 
>>>>>> fields weren't.
>>>>>>   - BigQueryUtils.toTableRow(Row) recurses on the Row values and
>>>>>>   uses the Row.schema to get the field names.  This makes sense to 
>>>>>> me, but if
>>>>>>   a value is actually a Row then its schema appears to be 
>>>>>> inconsistent with
>>>>>>   the top-level schema
>>>>>>- My Current Workaround - I forked RenameFields and replaced the
>>>>>>attachValues in expand method to be a "deep" rename.  This is 
>>>>>> obviously
>>>>>>inefficient and I will not be submitting a PR for that.
>>>>>>- JIRA ticket - https://issues.apache.org/jira/browse/BEAM-12442
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 1, 2021 at 5:51 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> This transform is the same across all runners. A few comments on the
>>>>>>> test:
>>>>>>>
>>>>>>>   - Using attachValues directly is error prone (per the comment on
>>>>>>> the method). I recommend using the withFieldValue builders instead.
>>>>>>>   - I recommend capturing the RenameFields PCollection into a local
>>>>>>> variable of type PCollection and printing out the schema (which you
>>>>>>> can get using the PCollection.getSchema method) to ensure that the 
>>>>>>> output
>>>>>>> schema looks like you expect.
>>>>>>>- RenameFields doesn't flatten. So renaming field0_1.field1_0 - >
>>>>>>> nestedStringField results in field0_1.nestedStringField; if you wanted 
>>>>>>> to
>>>>>>> flatten, then the better transform would be
>>>>>>> Select.fieldNameAs("field0_1.field1_0", nestedStringField).
>>>>>>>
>>>>>>> This all being said, eyeballing the implementation of RenameFields
>>>>>>> makes me think that it is buggy in the case where you specify a 
>>>>>>> top-level
>>>>>>> field multiple times like you do. I think it is simply adding the 
>>>>>>> top-level
>>>>>>> field into the output schema multiple times, and the second time is with
>>>>>>> the field0_1 base name; I have no idea why your test doesn't catch this 
>>>>>>> in
>>>>>>> the DirectRunner, as it's equally broken there. Could you file a JIRA 
>>>>>>> about
>>>>>>> this issue and assign it to me?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 12:47 PM Kenneth Knowles 
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Matthew,
>>>>>>>>>
>>>>>>>>> > The unit tests also seem to be disabled for this as well and so
>>>>>>>>> I don’t know if the PTransform behaves as expected.
>>>>>>>>>
>>>>>>>>> The exclusion for NeedsRunner tests is just a quirk in our testing
>>>>>>>>> framework. NeedsRunner indicates that a test suite can't be executed 
>>>>>>>>> with
>>>>>>>>> the SDK alone, it needs a runner. So that exclusion just makes sure we
>>>>>>>>> don't run the test when we're verifying the SDK by itself in the
>>>>>>>>> :sdks:java:core:test task. The test is still run in other tasks where 
>>>>>>>>> we
>>>>>>>>> have a runner, most notably in the Java PreCommit [1], where we run 
>>>>>>>>&

Re: RenameFields behaves differently in DirectRunner

2021-06-01 Thread Kenneth Knowles
On Tue, Jun 1, 2021 at 12:42 PM Brian Hulette  wrote:

> Hi Matthew,
>
> > The unit tests also seem to be disabled for this as well and so I don’t
> know if the PTransform behaves as expected.
>
> The exclusion for NeedsRunner tests is just a quirk in our testing
> framework. NeedsRunner indicates that a test suite can't be executed with
> the SDK alone, it needs a runner. So that exclusion just makes sure we
> don't run the test when we're verifying the SDK by itself in the
> :sdks:java:core:test task. The test is still run in other tasks where we
> have a runner, most notably in the Java PreCommit [1], where we run it as
> part of the :runners:direct-java:test task.
>
> That being said, we may only run these tests continuously with the
> DirectRunner, I'm not sure if we test them on all the runners like we do
> with ValidatesRunner tests.
>

That is correct. The tests are tests _of the transform_ so they run only on
the DirectRunner. They are not tests of the runner, which is only
responsible for correctly implementing Beam's primitives. The transform
should not behave differently on different runners, except for fundamental
differences in how they schedule work and checkpoint.

Kenn


> > The error message I’m receiving, : Error while reading data, error
> message: JSON parsing error in row starting at position 0: No such field:
> nestedField.field1_0, suggests the BigQuery is trying to use the original
> name for the nested field and not the substitute name.
>
> Is there a stacktrace associated with this error? It would be helpful to
> see where the error is coming from.
>
> Brian
>
>
> [1]
> https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/4101/testReport/org.apache.beam.sdk.schemas.transforms/RenameFieldsTest/
>
> On Mon, May 31, 2021 at 5:02 PM Matthew Ouyang 
> wrote:
>
>> I’m trying to use the RenameFields transform prior to inserting into
>> BigQuery on nested fields.  Insertion into BigQuery is successful with
>> DirectRunner, but DataflowRunner has an issue with renamed nested fields
>>  The error message I’m receiving, : Error while reading data, error
>> message: JSON parsing error in row starting at position 0: No such field:
>> nestedField.field1_0, suggests the BigQuery is trying to use the
>> original name for the nested field and not the substitute name.
>>
>> The code for RenameFields seems simple enough but does it behave
>> differently in different runners?  Will a deep attachValues be necessary in
>> order get the nested renames to work across all runners? Is there something
>> wrong in my code?
>>
>>
>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java#L186
>>
>> The unit tests also seem to be disabled for this as well and so I don’t
>> know if the PTransform behaves as expected.
>>
>>
>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/build.gradle#L67
>>
>>
>> https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
>>
>> package ca.loblaw.cerebro.PipelineControl;
>>>
>>> import com.google.api.services.bigquery.model.TableReference;
>>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>> import org.apache.beam.sdk.Pipeline;
>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>> import org.apache.beam.sdk.schemas.Schema;
>>> import org.apache.beam.sdk.schemas.transforms.RenameFields;
>>> import org.apache.beam.sdk.transforms.Create;
>>> import org.apache.beam.sdk.values.Row;
>>>
>>> import java.io.File;
>>> import java.util.Arrays;
>>> import java.util.HashSet;
>>> import java.util.stream.Collectors;
>>>
>>> import static java.util.Arrays.*asList*;
>>>
>>> public class BQRenameFields {
>>> public static void main(String[] args) {
>>> PipelineOptionsFactory.*register*(DataflowPipelineOptions.class
>>> );
>>> DataflowPipelineOptions options = PipelineOptionsFactory.
>>> *fromArgs*(args).as(DataflowPipelineOptions.class);
>>> options.setFilesToStage(
>>> Arrays.*stream*(System.*getProperty*("java.class.path").
>>> split(File.*pathSeparator*)).
>>> map(entry -> (new
>>> File(entry)).toString()).collect(Collectors.*toList*()));
>>>
>>> Pipeline pipeline = Pipeline.*create*(options);
>>>
>>> Schema nestedSchema = Schema.*builder*().addField(Schema.Field.
>>> *nullable*("field1_0", Schema.FieldType.*STRING*)).build();
>>> Schema.Field field = Schema.Field.*nullable*("field0_0", Schema.
>>> FieldType.*STRING*);
>>> Schema.Field nested = Schema.Field.*nullable*("field0_1", Schema
>>> .FieldType.*row*(nestedSchema));
>>> Schema.Field runner = Schema.Field.*nullable*("field0_2", Schema
>>> 

Re: Windowing

2021-05-26 Thread Kenneth Knowles
When you set up a trigger at the beginning of the pipeline, all the later
GroupByKey/Combine operations should automatically get a trigger that
mostly "let's the data flow". If you can share more about your pipeline -
especially code - then we could probably help more.

Kenn

On Wed, May 26, 2021 at 9:11 AM Sozonoff Serge  wrote:

> Hi,
>
> Well not exactly but maybe I can do something with that.
>
> Is there no way to simply assign some sort of Global trigger to the entire
> pipeline so do not have to define it for each Collection in question ?
>
> Thanks,
> Serge
>
>
>
>
> On 26 May 2021 at 18:01:14, Kenneth Knowles (k...@apache.org) wrote:
>
> You can use Window.configure() to only set the values you want to change.
> Is that what you mean?
>
> Kenn
>
> On Wed, May 26, 2021 at 8:42 AM Sozonoff Serge  wrote:
>
>> Hi All,
>>
>> I find myself having to pepper Window transforms all over my pipeline, I
>> count about 9 in order to get my pipeline to run. Aside from the class type
>> they window, all the statements are identical.
>>
>> A window into new GlobalWindows() using an identical trigger.
>>
>> Is there a way to change the trigger on the global window so I could
>> avoid this extra code all over the place ?
>>
>> I really dont care about the windowing but its imposed since the pipeline
>> is unbound and there are operations which require the window.
>>
>> Thanks,
>> Serge
>>
>>
>>


Re: Windowing

2021-05-26 Thread Kenneth Knowles
You can use Window.configure() to only set the values you want to change.
Is that what you mean?

Kenn

On Wed, May 26, 2021 at 8:42 AM Sozonoff Serge  wrote:

> Hi All,
>
> I find myself having to pepper Window transforms all over my pipeline, I
> count about 9 in order to get my pipeline to run. Aside from the class type
> they window, all the statements are identical.
>
> A window into new GlobalWindows() using an identical trigger.
>
> Is there a way to change the trigger on the global window so I could
> avoid this extra code all over the place ?
>
> I really dont care about the windowing but its imposed since the pipeline
> is unbound and there are operations which require the window.
>
> Thanks,
> Serge
>
>
>


Re: Error with Beam/Flink Python pipeline with Kafka

2021-05-24 Thread Kenneth Knowles
Thanks for the details in the gist. I would guess the key to the error is
this:

2021/05/24 13:45:34 Initializing java harness: /opt/apache/beam/boot
--id=1-2 --provision_endpoint=localhost:33957
2021/05/24 13:45:44 Failed to retrieve staged files: failed to retrieve
/tmp/staged in 3 attempts: failed to retrieve chunk for
/tmp/staged/beam-sdks-java-io-expansion-service-2.28.0-oErG3M6t3yoDeSvfFRW5UCvvM33J-4yNp7xiK9glKtI.jar
caused by:
rpc error: code = Unknown desc = ;
...

There is something wrong with the artifact provisioning endpoint perhaps? I
can't help any further than this.

Kenn

On Mon, May 24, 2021 at 6:58 AM Nir Gazit  wrote:

> Hey,
> I'm having issues with running a simple pipeline on a remote Flink
> cluster. I use a separate Beam job server to which I submit the job, which
> then goes to the Flink cluster with docker-in-docker enabled. For some
> reason, I'm getting errors in Flink which I can't decipher (see gist
> ).
>
> Can someone assist me?
>
> Thanks!
> Nir
>


Re: How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Kenneth Knowles
+dev  +Reuven Lax 

Advancing the watermark to infinity does have an effect on the
GlobalWindow. The GlobalWindow ends a little bit before infinity :-). That
is why this works to cause the output even for unbounded aggregations.

Kenn

On Fri, May 21, 2021 at 5:10 AM Jeff Klukas  wrote:

> Beam users,
>
> We're attempting to write a Java pipeline that uses Count.perKey() to
> collect event counts, and then flush those to an HTTP API every ten minutes
> based on processing time.
>
> We've tried expressing this using GlobalWindows with an
> AfterProcessingTime trigger, but we find that when we drain the pipeline
> we end up with entries in the droppedDueToLateness metric. This was
> initially surprising, but may be line line with documented behavior [0]:
>
> > When you issue the Drain command, Dataflow immediately closes any
> in-process windows and fires all triggers. The system does not wait for any
> outstanding time-based windows to finish. Dataflow causes open windows to
> close by advancing the system watermark to infinity
>
> Perhaps advancing watermark to infinity has no effect on GlobalWindows, so
> we attempted to get around this by using a fixed but arbitrarily-long
> window:
>
> FixedWindows.of(Duration.standardDays(36500))
>
> The first few tests with this configuration came back clean, but the third
> test again showed droppedDueToLateness after calling Drain. You can see
> this current configuration in [1].
>
> Is there a pattern for reliably flushing on Drain when doing processing
> time-based aggregates like this?
>
> [0]
> https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#effects_of_draining_a_job
> [1]
> https://github.com/mozilla/gcp-ingestion/pull/1689/files#diff-1d75ce2cbda625465d5971a83d842dd35e2eaded2c2dd2b6c7d0d7cdfd459115R58-R71
>
>


Re: Beam/Dataflow pipeline backfill via JDBC

2021-05-12 Thread Kenneth Knowles
Can you share some more details, such as code? We may identify something
that relies upon assumptions from batch execution style.

Also notably the Java DirectRunner does not have separate batch/streaming
mode. It always executes in a "streaming" sort of way. It is also simpler
in some ways so if you can reproduce it on the DirectRunner that might help.

Kenn

On Tue, May 11, 2021 at 3:41 PM Raman Gupta  wrote:

> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My ideal
> pipeline backfills its state and output from historical data via the JDBC
> input, and then continues processing new elements arriving via pub/sub.
> Conceptually, this seems easy to do with a filter on each source
> before/after some specific cutoff instant.
>
> However, when I add pub/sub into the pipeline, it runs in streaming mode,
> and the pipeline does not produce the expected results -- all of the
> results that would be produced based on looping timers seem to be missing.
>
> I thought this might be related to the post-inputs Flatten, but I've taken
> pub/sub out of the equation, and run the same exact JDBC-based pipeline in
> batch vs streaming mode, and the JDBC-only pipeline in streaming mode
> produces the same partial results.
>
> What could be happening?
>
> Regards,
> Raman
>
>


Re: DirectRunner, Fusion, and Triggers

2021-05-12 Thread Kenneth Knowles
On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad  wrote:

> Hi Beam-users,
>
> *TL;DR;* I wonder if DirectRunner does any fusion optimization
> 
> and whether this has any impact on triggers/panes?
>

> *Details* (the context for everything below is *DirectRunner* and this is
> a *batch* job):
> I have a batch pipeline that roughly looks like this: S1->S2->S3
>
> S1: Create URLs (from DB)
> S2: Fetch those URLs (output of S1) and create Avro records
> S3: Write those records to Parquet files
>
> S2 and S3 can be fused to generate Parquet files while the records are
> fetched/created. However, it does not seem to be the case, because there is
> no [temp] file while the resources are being fetched and the writer log
> messages appear only after all fetches are done.
>
> If I add a trigger to the output PCollection of S2 (i.e., `records`
> below), then I get intermediate Parquet output:
> ```
> records.apply(Window. into(new GlobalWindows())
>.triggering(Repeatedly.forever(
>AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5
>.discardingFiredPanes());
> ```
>
> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only
> prints some log messages for each record and passes the record to output,
> then it seems S2 and S2' are fused. Because the log messages are
> interleaved with fetches.
>
> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
> DataflowRunner)? If not by default, is there any way to enable it?
>

The Java DirectRunner does not do any fusion optimization. There's no code
to enable :-). It should affect performance only, not semantics. The
DirectRunner is known to have poor performance, but mostly no one is
working on speeding it up because it is really just for small-scale testing.



> The other issue is with triggers and creating panes. I have an extended
> version of this pipeline where a simplified view of it is:
> S1->S2A->GBK->S2B->S3
>
> S1: Like before
> S2A: Add a key to the output of S1
> GBK: Groups output of S2A to remove duplicate keys
> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records
> S3: Same as before
>
> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages are
> *not* interleaved with resource fetches, i.e., no fusion is happening.
> Why? What is different here?
>

I don't quite understand what the problem is here.



> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
> file generation does not start until all of the fetches are done. Again,
> what is different here and why intermediate panes are not fired while the
> output of S2B is being generated?
>

I think it would help to see how you have configured the ParquetIO write
transform.

Kenn


>
> Thanks
>
> -B
> P.S. I need this pipeline to work both on a distributed runner and also on
> a local machine with many cores. That's why the performance of DirectRunner
> is important to me.
>


Re: Question on late data handling in Beam streaming mode

2021-04-23 Thread Kenneth Knowles
Reuven's answer will result in a group by key (but not window) where no
data is dropped and you get deltas for each key. Downstream consumers can
recombine the deltas to get per-key aggregation. So instead of putting the
time interval into the window, you put it into the key, and then you get
the same grouped aggregation.

There are (at least) two other ways to do this:

1. You can set allowed lateness to a high value.
2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to
arrival time. I illustrated this in some older talks [2].

Kenn

[1]
https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184
[2]
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134

On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax  wrote:

> You can definitely group by processing time. The way to do this in Beam is
> as follows
>
> Window.into(new GlobalWindows())
> .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))
> .discardingFiredPanes());
>
> The syntax is a bit unfortunately wordy, but the idea is that you are
> creating a single event-time window that encompasses all time, and
> "triggering" an aggregation every 30 seconds based on processing time.
>
> On Fri, Apr 23, 2021 at 8:14 AM Tao Li  wrote:
>
>> Thanks @Kenneth Knowles . I understand we need to
>> specify a window for groupby so that the app knowns when processing is
>> “done” to output result.
>>
>>
>>
>> Is it possible to specify a event arrival/processing time based window
>> for groupby? The purpose is to avoid dropping of late events. With a event
>> processing time based window, the app will periodically output the result
>> based on all events that arrived in that window, and a late arriving event
>> will fall into whatever window covers its arrival time and thus that late
>> data will not get lost.
>>
>>
>>
>> Does Beam support this kind of mechanism? Thanks.
>>
>>
>>
>> *From: *Kenneth Knowles 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Thursday, April 22, 2021 at 1:49 PM
>> *To: *user 
>> *Cc: *Kelly Smith , Lian Jiang <
>> li...@zillowgroup.com>
>> *Subject: *Re: Question on late data handling in Beam streaming mode
>>
>>
>>
>> Hello!
>>
>>
>>
>> In a streaming app, you have two choices: wait forever and never have any
>> output OR use some method to decide that aggregation is "done".
>>
>>
>>
>> In Beam, the way you decide that aggregation is "done" is the watermark.
>> When the watermark predicts no more data for an aggregation, then the
>> aggregation is done. For example GROUP BY  is "done" when no more
>> data will arrive for that minute. At this point, your result is produced.
>> More data may arrive, and it is ignored. The watermark is determined by the
>> IO connector to be the best heuristic available. You can configure "allowed
>> lateness" for an aggregation to allow out of order data.
>>
>>
>>
>> Kenn
>>
>>
>>
>> On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:
>>
>> Hi Beam community,
>>
>>
>>
>> I am wondering if there is a risk of losing late data from a Beam stream
>> app due to watermarking?
>>
>>
>>
>> I just went through this design doc and noticed the “droppable”
>> definition there:
>> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0>
>>
>>
>>
>> Can you please confirm if it’s possible for us to lose some data in a
>> stream app in practice? If that’s possible, what would be the best practice
>> to avoid data loss? Thanks!
>>
>>
>>
>>


Re: Question on late data handling in Beam streaming mode

2021-04-22 Thread Kenneth Knowles
Hello!

In a streaming app, you have two choices: wait forever and never have any
output OR use some method to decide that aggregation is "done".

In Beam, the way you decide that aggregation is "done" is the watermark.
When the watermark predicts no more data for an aggregation, then the
aggregation is done. For example GROUP BY  is "done" when no more
data will arrive for that minute. At this point, your result is produced.
More data may arrive, and it is ignored. The watermark is determined by the
IO connector to be the best heuristic available. You can configure "allowed
lateness" for an aggregation to allow out of order data.

Kenn

On Thu, Apr 22, 2021 at 1:26 PM Tao Li  wrote:

> Hi Beam community,
>
>
>
> I am wondering if there is a risk of losing late data from a Beam stream
> app due to watermarking?
>
>
>
> I just went through this design doc and noticed the “droppable” definition
> there:
> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
>
>
>
> Can you please confirm if it’s possible for us to lose some data in a
> stream app in practice? If that’s possible, what would be the best practice
> to avoid data loss? Thanks!
>
>
>


Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Kenneth Knowles
[I think this has graduated to a +dev  thread]

Yea, in Beam it is left up to the IOs primarily, hence the bundle
finalization step, or allowed runners to have their own features of course.
Dataflow also does have in-place pipeline update that restores the
persisted checkpoints from one pipeline to another - same basic
mechanism/idea as Spark Structured Streaming but different overall
workflow. +Reuven Lax  has put a lot of thought into
updating, checkpointing, resuming, etc. Runners differ a lot in these
areas. Is there something that should graduate from runner-specific to the
Beam model?

Kenn

On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez 
wrote:

> Looks like this is a common source of confusion, I had similar questions
> about checkpointing in the beam slack.
>
> In Spark Structured Streaming, checkpoints are saved to an *external* HDFS
> location and persist *beyond* each run, so in the event of a stream
> crashing, you can just point your next execution of the stream to the
> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
> persisted in the checkpoint, so the stream would resume off of the last
> committed checkpoint location.
>
> It doesn't seem Beam has an external checkpoint that persists beyond a
> single stream execution, so in Beam with Kinesis I believe you'll have to
> manage your own offsets deliberately with an external source if you want to
> achieve 'exactly once' semantics in the event of shutting down a stream and
>  resuming it at a later point.
>
> In Kafka you don't need this since as long as we ensure our offsets are
> committed in finalization of a bundle, the offsets for a particular group
> id are stored on the server.
>
>
> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:
>
>> This sounds similar to the "Kafka Commit" in
>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>  and also to how PubsubIO ACKs messages in the
>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>> just asking to clarify, in case other folks know more, like +Alexey
>> Romanenko  and +Ismaël Mejía
>>  have modified KinesisIO. If the feature does not
>> exist today, perhaps we can identify the best practices around this pattern.
>>
>> Kenn
>>
>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey 
>> wrote:
>>
>>> Hi Kenn,
>>>
>>> yes, resuming reading at the proper timestamp is exactly the issue we
>>> are currently struggling with. E.g. with Kinesis Client Lib we could store
>>> the last read within some dynamo table. This mechanism is not used with
>>> beam, as we understand, the runner is responsible to track that checkpoint
>>> mark.
>>>
>>> Now, obviously on restarting the pipeline, e.g. on non compatible
>>> upgrade, that is, an pipeline update is just not feasible, there must be
>>> some mechanism in place on how Dataflow will know where to continue. Is
>>> that simply the pipeline name? Or is there more involved? So how does
>>> checkpointing actually work here?
>>>
>>> Based on 'name', wouldn't that imply that something like (example taken
>>> from
>>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>>> )
>>>
>>>   export REGION="us-central1"
>>>
>>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>>> +%Y%m%d-%H%M%S`" \
>>> --template-file-gcs-location "$TEMPLATE_PATH" \
>>> --parameters inputSubscription="$SUBSCRIPTION" \
>>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>>> --region "$REGION"
>>>
>>> will not resume on last read on rerun, because the name obviously
>>> changes here?
>>>
>>> best,
>>>
>>> michel
>>>
>>>
>>>
>>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>>>
>>>> I would assume the main issue is resuming reading from the Kinesis
>>>> stream from the last read? In the case for Pubsub (just as another example
>>>> of the idea) this is part of the internal state of a pre-created
>>>> subscription.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>>>> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> with our current project we are implementing our streaming pipeline
>>>>> based on Google Dataflow.
>>>>>
>>>>> Essentially we receive input via Kinesis, doing s

Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Kenneth Knowles
This sounds similar to the "Kafka Commit" in
https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
 and also to how PubsubIO ACKs messages in the
finalizer. I don't know much about KinesisIO or how Kinesis works. I was
just asking to clarify, in case other folks know more, like +Alexey
Romanenko  and +Ismaël Mejía  have
modified KinesisIO. If the feature does not exist today, perhaps we can
identify the best practices around this pattern.

Kenn

On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  wrote:

> Hi Kenn,
>
> yes, resuming reading at the proper timestamp is exactly the issue we are
> currently struggling with. E.g. with Kinesis Client Lib we could store the
> last read within some dynamo table. This mechanism is not used with beam,
> as we understand, the runner is responsible to track that checkpoint mark.
>
> Now, obviously on restarting the pipeline, e.g. on non compatible upgrade,
> that is, an pipeline update is just not feasible, there must be some
> mechanism in place on how Dataflow will know where to continue. Is that
> simply the pipeline name? Or is there more involved? So how does
> checkpointing actually work here?
>
> Based on 'name', wouldn't that imply that something like (example taken
> from
> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
> )
>
>   export REGION="us-central1"
>
>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
> +%Y%m%d-%H%M%S`" \
> --template-file-gcs-location "$TEMPLATE_PATH" \
> --parameters inputSubscription="$SUBSCRIPTION" \
> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
> --region "$REGION"
>
> will not resume on last read on rerun, because the name obviously changes
> here?
>
> best,
>
> michel
>
>
>
> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>
>> I would assume the main issue is resuming reading from the Kinesis stream
>> from the last read? In the case for Pubsub (just as another example of the
>> idea) this is part of the internal state of a pre-created subscription.
>>
>> Kenn
>>
>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>> wrote:
>>
>>> Hi list,
>>>
>>> with our current project we are implementing our streaming pipeline
>>> based on Google Dataflow.
>>>
>>> Essentially we receive input via Kinesis, doing some filtering,
>>> enrichment and sessionizing and output to PubSub and/or google storage.
>>>
>>> After short investigations it is not clear to us, how checkpointing will
>>> work running on Dataflow in connection with KinesisIO. Is there any
>>> documentation/discussions to get a better understanding on how that will be
>>> working? Especially if we are forced to restart our pipelines, how could we
>>> ensure not to loose any events?
>>>
>>> As far as I understand currently, it should work 'auto-magically' but it
>>> is not yet clear to us, how it will actually behave. Before we try to start
>>> testing our expectations or even try to implement some watermark-tracking
>>> by ourself we hoped to get some insights from other users here.
>>>
>>> Any help appreciated.
>>>
>>> Best,
>>>
>>> michel
>>>
>>


Re: Checkpointing Dataflow Pipeline

2021-04-06 Thread Kenneth Knowles
I would assume the main issue is resuming reading from the Kinesis stream
from the last read? In the case for Pubsub (just as another example of the
idea) this is part of the internal state of a pre-created subscription.

Kenn

On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey  wrote:

> Hi list,
>
> with our current project we are implementing our streaming pipeline based
> on Google Dataflow.
>
> Essentially we receive input via Kinesis, doing some filtering, enrichment
> and sessionizing and output to PubSub and/or google storage.
>
> After short investigations it is not clear to us, how checkpointing will
> work running on Dataflow in connection with KinesisIO. Is there any
> documentation/discussions to get a better understanding on how that will be
> working? Especially if we are forced to restart our pipelines, how could we
> ensure not to loose any events?
>
> As far as I understand currently, it should work 'auto-magically' but it
> is not yet clear to us, how it will actually behave. Before we try to start
> testing our expectations or even try to implement some watermark-tracking
> by ourself we hoped to get some insights from other users here.
>
> Any help appreciated.
>
> Best,
>
> michel
>


Re: Global window + stateful transformation

2021-03-31 Thread Kenneth Knowles
On Wed, Mar 31, 2021 at 10:20 AM Kenneth Knowles  wrote:

>
> On Wed, Mar 31, 2021 at 10:19 AM Hemali Sutaria <
> hsuta...@paloaltonetworks.com> wrote:
>
>> I have a global window with per-key-and-window stateful processing
>> dataflow job. Do I need groupbykey in my transform ? Thank you
>>
>
No you do not need a GroupByKey. When you use a stateful DoFn the Beam
runner will partition the data automatically by key and window.

Kenn


>
>>
>> https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind
>>
>> https://beam.apache.org/documentation/programming-guide/#transforms
>>
>>
>> https://beam.apache.org/blog/timely-processing/
>>
>>
>> Thanks,
>> Hemali Sutaria
>>
>>


Re: Global window + stateful transformation

2021-03-31 Thread Kenneth Knowles
Great question!

Moving this to user@beam.apache.org

Kenn

On Wed, Mar 31, 2021 at 10:19 AM Hemali Sutaria <
hsuta...@paloaltonetworks.com> wrote:

> Beam Developers,
>
> I have a global window with per-key-and-window stateful processing
> dataflow job. Do I need groupbykey in my transform ? Thank you
>
>
> https://cloud.google.com/blog/products/gcp/writing-dataflow-pipelines-with-scalability-in-mind
>
> https://beam.apache.org/documentation/programming-guide/#transforms
>
>
> https://beam.apache.org/blog/timely-processing/
>
>
> Thanks,
> Hemali Sutaria
>
>


Re: Triggering partway through a window

2021-03-29 Thread Kenneth Knowles
That's a neat example!

The trigger you have there will emit a ton of output. What is your
accumulation mode? I assume it must be accumulatingFiredPanes() otherwise
you would not actually have access to the prior 6 days of input.

The only trigger that is based on completeness of data is the
AfterWatermark.pastEndOfWindow() trigger, so you have to use that to
capture the 6 days of data:

prior6days = input.apply(Window.into(<6 day windows sliding one
day>).triggering(AfterWatermark.pastEndOfWindow())

Now if you GBK this collection, each group will have a timestamp that is
the end of the 6 day period. You can use ParDo with outputWithTimestamp to
move the timestamp up to any timestamp in the following day, yielding a
PCollection of 6 day grouping of data with a timestamp in the last day of
the 7. If the 6 days of data is large you may hit size limits (either hard
limits or perf problems) and have to do something fancier.

Flatten this with the input PCollection and window into FixedWindows() and trigger however you like, again with accumulatingFiredPanes().
There is no guarantee that the 6 days of past data arrives prior to
elements in the last day. In fact since it will be delayed by an extra
shuffle you would expect it to often show up later. So this is a heuristic
approach equivalent to what it sounds like you are already doing that
should lower the cost.

If you want a guarantee that the 6 day buffer arrives prior to the other
elements you will need to do something else. You could write a WindowFn
that assigned all 7 days of data to a window that only spanned the first 6
days, then trigger at end of window plus allowing late data (no early
firings). Then every firing would be guaranteed by the watermark to have
the first 6 days of data plus whatever else has shown up. (I assume part of
your spec is that you do want data to be processed as it arrives, versus
waiting until the end of the 7 day window).

I am just writing this without coding, so I could certainly have missed
something or gotten it wrong.

Kenn

On Fri, Mar 26, 2021 at 1:47 PM Raman Gupta  wrote:

> I have a 7-day sliding calendar window, sliding by 1 day. The intent is to
> process only elements that fall into the last day of a window, but still
> have access to the elements from the preceding six days.
>
> I created a sliding calendar window function, and trigger it like this:
>
> AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
>   .withLateFirings(AfterPane.elementCountAtLeast(1))
>
> Downstream of this pipeline I have a GBK and a DoFn that basically ignores
> elements until at least some of them are in the last day of the window.
>
> The above trigger works and the pipeline produces the expected output, but
> runs the GBK and downstream logic many more times than is necessary.
>
> Is there a way I can optimize the triggering here such that the early
> firings begin only when the watermark moves into the last day of the 7-day
> window?
>
> Thanks,
> Raman
>
>


Re: General guidance

2021-03-25 Thread Kenneth Knowles
This is a Beam issue indeed, though it is an issue with the FlinkRunner. So
I think I will BCC the Flink list.

You may be in one of the following situations:
 - These timers should not be viewed as distinct by the runner, but
deduped, per
https://issues.apache.org/jira/browse/BEAM-8212#comment-16946013
 - There is a different problem if you have an unbounded key space with
windows that never expire, since then there are unbounded numbers of truly
distinct (but irrelevant) timers. That is also the responsibility of the
runner to simply not set timers that can never fire.

Kenn

On Thu, Mar 25, 2021 at 11:57 AM Almeida, Julius 
wrote:

> Hi Team,
>
>
>
> My streaming pipeline is based on beam & running using flink runner with
> rocksdb as state backend.
>
>
>
> Over time I am  seeing memory spike & after giving a look at heap dump, I
> am seeing records in  ‘__StatefulParDoGcTimerId’ which seems to be never
> cleaned.
>
>
>
> Found this jira https://issues.apache.org/jira/browse/BEAM-8212
> describing the issue I believe I am facing.
>
>
>
> Any pointers would be helpful in identifying possible solution.
>
>
>
> Thanks,
>
> Julius
>


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Kenneth Knowles
The reason I was checking out the code is that sometimes a natural thing to
output would be a summary of what was written. So each chunk of writes and
the final chunk written in @FinishBundle. This is, for example, what SQL
engines do (output # of rows written).

You could output both the summary and the full list of written elements to
different outputs, and users can choose. Outputs that are never consumed
should be very low or zero cost.

Kenn

On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw  wrote:

> Yeah, the entire input is not always what is needed, and can generally be
> achieved via
>
> input -> wait(side input of write) -> do something with the input
>
> Of course one could also do
>
> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
> CombineGlobally(TrivialCombineFn)
>
> to reduce this to a more minimal set with at least one element per Window.
>
> The file writing operations emit the actual files that were written, which
> can be handy. My suggestion of PCollection was just so that we can emit
> something usable, and decide exactly what is the most useful is later.
>
>
> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:
>
>> I believe that the Wait transform turns this output into a side input, so
>> outputting the input PCollection might be problematic.
>>
>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:
>>
>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>> just reading really quickly so sorry if I missed something...
>>>
>>> Checking out the code for the WriteFn I see a big problem:
>>>
>>> @Setup
>>> public void setup() {
>>>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>> }
>>>
>>> @ProcessElement
>>>   public void processElement(ProcessContext c) throws
>>> ExecutionException, InterruptedException {
>>>   writer.mutate(c.element());
>>> }
>>>
>>> @Teardown
>>> public void teardown() throws Exception {
>>>   writer.close();
>>>   writer = null;
>>> }
>>>
>>> It is only in writer.close() that all async writes are waited on. This
>>> needs to happen in @FinishBundle.
>>>
>>> Did you discover this when implementing your own Cassandra.Write?
>>>
>>> Until you have waited on the future, you should not output the element
>>> as "has been written". And you cannot output from the @TearDown method
>>> which is just for cleaning up resources.
>>>
>>> Am I reading this wrong?
>>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>>>
>>>> How about a PCollection containing every element which was successfully
>>>> written?
>>>> Basically the same things which were passed into it.
>>>>
>>>> Then you could act on every element after its been successfully written
>>>> to the sink.
>>>>
>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
>>>>> wrote:
>>>>>
>>>>>> +dev
>>>>>>
>>>>>> Since we all agree that we should return something different than
>>>>>> PDone the real question is what should we return.
>>>>>>
>>>>>
>>>>> My proposal is that one returns a PCollection that consists,
>>>>> internally, of something contentless like nulls. This is future compatible
>>>>> with returning something more maningful based on the source source or 
>>>>> write
>>>>> process itself, but at least this would be followable.
>>>>>
>>>>>
>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>> already in the past but uniformization of our return values has not
>>>>>> happened.
>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>> contribute Write transforms that return.
>>>>>>
>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>
>>>>>
>>>>> Yeah, we should go ahead and finally do something.
>>>>>
>>>>>
>>>>>>
>>>>>> > Ret

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Kenneth Knowles
Alex's idea sounds good and like what Vincent maybe implemented. I am just
reading really quickly so sorry if I missed something...

Checking out the code for the WriteFn I see a big problem:

@Setup
public void setup() {
  writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
}

@ProcessElement
  public void processElement(ProcessContext c) throws
ExecutionException, InterruptedException {
  writer.mutate(c.element());
}

@Teardown
public void teardown() throws Exception {
  writer.close();
  writer = null;
}

It is only in writer.close() that all async writes are waited on. This
needs to happen in @FinishBundle.

Did you discover this when implementing your own Cassandra.Write?

Until you have waited on the future, you should not output the element as
"has been written". And you cannot output from the @TearDown method which
is just for cleaning up resources.

Am I reading this wrong?

Kenn

On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:

> How about a PCollection containing every element which was successfully
> written?
> Basically the same things which were passed into it.
>
> Then you could act on every element after its been successfully written to
> the sink.
>
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:
>>
>>> +dev
>>>
>>> Since we all agree that we should return something different than
>>> PDone the real question is what should we return.
>>>
>>
>> My proposal is that one returns a PCollection that consists,
>> internally, of something contentless like nulls. This is future compatible
>> with returning something more maningful based on the source source or write
>> process itself, but at least this would be followable.
>>
>>
>>> As a reminder we had a pretty interesting discussion about this
>>> already in the past but uniformization of our return values has not
>>> happened.
>>> This thread is worth reading for Vincent or anyone who wants to
>>> contribute Write transforms that return.
>>>
>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>
>>
>> Yeah, we should go ahead and finally do something.
>>
>>
>>>
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible.
>>>
>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>> worth to the maintainers to judge if the upgrade to return someething
>>> different of PDone is worth, in that case we can deprecate and remove
>>> the previous signature in short time (2 releases was the average for
>>> previous cases).
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>  wrote:
>>> >
>>> > I thought that was said about returning a PCollection of write results
>>> as it’s done in other IOs (as I mentioned as examples) that have
>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>> PTransform<…, PCollection>.
>>> > In this case, we keep backwards compatibility and just add new
>>> funtionality. Though, we need to follow the same pattern for user API and
>>> maybe even naming for this feature across different IOs (like we have for
>>> "readAll()” methods).
>>> >
>>> >  I agree that we have to avoid returning PDone for such cases.
>>> >
>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw  wrote:
>>> >
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>> returning variants (probably as another option to the builders) that
>>> compose well with Wait, etc. would be welcome.
>>> >
>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>> >>
>>> >> In this way, I think “Wait” PTransform should work for you but, as it
>>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>>> a signal.
>>> >>
>>> >> Since you already adjusted your own writer for that, it would be
>>> great to contribute it back to Beam in the way as it was done for other IOs
>>> (for example, JdbcIO [1] or BigtableIO [2])
>>> >>
>>> >> In general, I think we need to have it for all IOs, at least to use
>>> with “Wait” because this pattern it's quite often required.
>>> >>
>>> >> [1]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>> >> [2]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>> >>
>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez 
>>> wrote:
>>> >>
>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>> successfully written to a database.  So "record by record" is fine, or even
>>> "bundle".
>>> >>
>>> >> ~Vincent
>>> >>
>>> >>
>>> >> On Wed, 

Do you use TimestampCombiner.EARLIEST with SlidingWindows or other overlapping windows?

2021-03-11 Thread Kenneth Knowles
Hi users,

** Do you use TimestampCombiner.EARLIEST with SlidingWindows or other
overlapping windows? If no, then you can stop reading now. **

We are considering a change to simplify how timestamps are computed for
aggregations in this case. Warning: this is a bit complicated and a curious
corner of Beam, but quite important if you do use this configuration.

Currently, when you use EARLIEST with overlapping windows like
SlidingWindows, the output timestamp is shifted later, to the beginning of
the next window. This is a hack to allow downstream watermarks to progress.

To illustrate: consider the windows A-B and C-D which overlap, and an
element with timestamp x that is earlier than C:

A-x--B
C--D

Today, the aggregation for window A-B will have the timestamp C, not x.

We are considering changing this to simply use the minimum: x. This is more
expected behavior. It will have the possible negative effect of holding the
watermark so that downstream aggregations are delayed.

Kenn


Re: Is there an array explode function/transform?

2021-01-13 Thread Kenneth Knowles
Just the fields specified, IMO. When in doubt, copy SQL. (and I mean SQL
generally, not just Beam SQL)

Kenn

On Wed, Jan 13, 2021 at 11:17 AM Reuven Lax  wrote:

> Definitely could be a top-level transform. Should it automatically unnest
> all arrays, or just the fields specified?
>
> We do have to define the semantics for nested arrays as well.
>
> On Wed, Jan 13, 2021 at 10:57 AM Robert Bradshaw 
> wrote:
>
>> Ah, thanks for the clarification. UNNEST does sound like what you want
>> here, and would likely make sense as a top-level relational transform as
>> well as being supported by SQL.
>>
>> On Wed, Jan 13, 2021 at 10:53 AM Tao Li  wrote:
>>
>>> @Kyle Weaver  sure thing! So the input/output
>>> definition for the Flatten.Iterables
>>> 
>>> is:
>>>
>>>
>>>
>>> Input: PCollection
>>>
>>> Output: PCollection
>>>
>>>
>>>
>>> The input/output for a explode transform would look like this:
>>>
>>> Input:  PCollection The row schema has a field which is an array of
>>> T
>>>
>>> Output: PCollection The array type field from input schema is
>>> replaced with a new field of type T. The elements from the array type field
>>> are flattened into multiple rows in the new table (other fields of input
>>> table are just duplicated.
>>>
>>>
>>>
>>> Hope this clarification helps!
>>>
>>>
>>>
>>> *From: *Kyle Weaver 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Tuesday, January 12, 2021 at 4:58 PM
>>> *To: *"user@beam.apache.org" 
>>> *Cc: *Reuven Lax 
>>> *Subject: *Re: Is there an array explode function/transform?
>>>
>>>
>>>
>>> @Reuven Lax  yes I am aware of that transform, but
>>> that’s different from the explode operation I was referring to:
>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>> 
>>>
>>>
>>>
>>> How is it different? It'd help if you could provide the signature (input
>>> and output PCollection types) of the transform you have in mind.
>>>
>>>
>>>
>>> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>>>
>>> @Reuven Lax  yes I am aware of that transform, but
>>> that’s different from the explode operation I was referring to:
>>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>> 
>>>
>>>
>>>
>>> *From: *Reuven Lax 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Tuesday, January 12, 2021 at 2:04 PM
>>> *To: *user 
>>> *Subject: *Re: Is there an array explode function/transform?
>>>
>>>
>>>
>>> Have you tried Flatten.iterables
>>>
>>>
>>>
>>> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>>>
>>> Hi community,
>>>
>>>
>>>
>>> Is there a beam function to explode an array (similarly to spark sql’s
>>> explode())? I did some research but did not find anything.
>>>
>>>
>>>
>>> BTW I think we can potentially use FlatMap to implement the explode
>>> functionality, but a Beam provided function would be very handy.
>>>
>>>
>>>
>>> Thanks a lot!
>>>
>>>


Re: Is there an array explode function/transform?

2021-01-12 Thread Kenneth Knowles
Explode is called UNNEST in Beam SQL (and I believe this is the more
standard name).

FlatMap(arr -> arr) is a simple and efficient implementation for straight
Beam.

Kenn

On Tue, Jan 12, 2021 at 4:58 PM Kyle Weaver  wrote:

> @Reuven Lax  yes I am aware of that transform, but
>> that’s different from the explode operation I was referring to:
>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>
>
> How is it different? It'd help if you could provide the signature (input
> and output PCollection types) of the transform you have in mind.
>
> On Tue, Jan 12, 2021 at 4:49 PM Tao Li  wrote:
>
>> @Reuven Lax  yes I am aware of that transform, but
>> that’s different from the explode operation I was referring to:
>> https://spark.apache.org/docs/latest/api/sql/index.html#explode
>>
>>
>>
>> *From: *Reuven Lax 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Tuesday, January 12, 2021 at 2:04 PM
>> *To: *user 
>> *Subject: *Re: Is there an array explode function/transform?
>>
>>
>>
>> Have you tried Flatten.iterables
>>
>>
>>
>> On Tue, Jan 12, 2021, 2:02 PM Tao Li  wrote:
>>
>> Hi community,
>>
>>
>>
>> Is there a beam function to explode an array (similarly to spark sql’s
>> explode())? I did some research but did not find anything.
>>
>>
>>
>> BTW I think we can potentially use FlatMap to implement the explode
>> functionality, but a Beam provided function would be very handy.
>>
>>
>>
>> Thanks a lot!
>>
>>


Re: Side input in streaming

2021-01-12 Thread Kenneth Knowles
That is an unfortunate bug. I found the JIRA.

It is actually just a transform built out of other primitives. You could
use it as the basis for your own version that works, until the fix has been
released.

Kenn

On Fri, Jan 8, 2021 at 12:08 AM Manninger, Matyas <
matyas.mannin...@veolia.com> wrote:

> Dear Kenn,
>
> Thanks again, that pattern was my initial plan but there seems to be a bug
> in the python API in the periodicsequence.py on line 42 "total_outputs =
> math.ceil((end - start) / interval)". Here end start and interval are all
> Durations and the / operator is not defined for the Duration class. I
> already wrote an email about this to this list but I didn't get a
> satisfactory answer on how to go around this issue so now I am trying to go
> around using PeriodicImpulse. If you have any other suggestions I would
> highly appreciate it.
>
> On Thu, 7 Jan 2021 at 18:29, Kenneth Knowles  wrote:
>
>> Actually, if you want to actually re-read the BQ table then you need
>> something more, following the pattern here:
>> https://beam.apache.org/documentation/patterns/side-inputs/. There are
>> two variations on the page there, and these do not use triggers but instead
>> the read from BigQuery at the beginning of the 24 hours is used by all of
>> the main input elements for the whole 24 hour window of the main input. The
>> general pattern is PeriodImpulse --> ParDo(convert impulse to read spec)
>> --> ReadAll
>>
>> I realize you did not specify what language you are using. The ReadAll
>> transform only exists for BigQuery in Python right now, and it is not yet
>> easy to use it from Java (plus you may not want to).
>>
>> Kenn
>>
>> On Thu, Jan 7, 2021 at 12:36 AM Manninger, Matyas <
>> matyas.mannin...@veolia.com> wrote:
>>
>>> Thanks Kenn for the clear explanation. Very helpful. I am trying to read
>>> a small BQ table as side input and refresh it every 24 hours or so but I
>>> still want to main stream to be processed during that time. Is there a
>>> better way to do this than have a 24 hour window with 1 minute triggers on
>>> the side input? Maybe just restarting the job every 24 hour and reading the
>>> side input on setup would be the best option.
>>>
>>> On Tue, 5 Jan 2021 at 17:53, Kenneth Knowles  wrote:
>>>
>>>> You have it basically right. However, there are a couple minor
>>>> clarifications:
>>>>
>>>> 1. A particular window on the side input is not "ready" until there has
>>>> been some element output to it (or it has expired, which will make it the
>>>> default value). Main input elements will wait for the side input to be
>>>> ready. If you configure triggering on the side input, then the first
>>>> triggering will make it "ready". Of course, this means that the value you
>>>> will read will be incomplete view of the data. If you have a 24 hour window
>>>> with triggering set up then the value that is read will be whatever the
>>>> most recent trigger is, but with some caching delay.
>>>> 2. None of the "time" that you are talking about is real time. It is
>>>> all event time so it is controlled by the side input and main input
>>>> watermarks. Of course in streaming these are usually close to real time so
>>>> yes on average what you describe is probably right.
>>>>
>>>> It sounds like you want a side input with a trigger on it, if you want
>>>> to read it before you have all the data. This is highly nondeterministic so
>>>> you want to be sure that you do not require exact answers on the side 
>>>> input.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Jan 5, 2021 at 6:56 AM Manninger, Matyas <
>>>> matyas.mannin...@veolia.com> wrote:
>>>>
>>>>> Dear Beam users,
>>>>>
>>>>> Can someone clarify me how side input works in streaming? If I use a
>>>>> stream as a side input to my main stream, each element will be paired with
>>>>> a side input from the according time window. does this mean that the
>>>>> element will not be processed until the appropriate window on the side
>>>>> input stream is closed? So if my side input is windowed into 24 hour
>>>>> windows will my elements from the main stream be processed only every 24
>>>>> hour? If not, then if the window is triggered for the sideinput at 12:00
>>>>> and the input actually only arrives at 12:05 then all elements from the
>>>>> main stream processed between 12:00 and 12:05 will be matched with an 
>>>>> empty
>>>>> sideinput?
>>>>>
>>>>> Any clarification is appreciated.
>>>>>
>>>>> Best regards,
>>>>> Matyas
>>>>>
>>>>


Re: Side input in streaming

2021-01-05 Thread Kenneth Knowles
You have it basically right. However, there are a couple minor
clarifications:

1. A particular window on the side input is not "ready" until there has
been some element output to it (or it has expired, which will make it the
default value). Main input elements will wait for the side input to be
ready. If you configure triggering on the side input, then the first
triggering will make it "ready". Of course, this means that the value you
will read will be incomplete view of the data. If you have a 24 hour window
with triggering set up then the value that is read will be whatever the
most recent trigger is, but with some caching delay.
2. None of the "time" that you are talking about is real time. It is all
event time so it is controlled by the side input and main input watermarks.
Of course in streaming these are usually close to real time so yes on
average what you describe is probably right.

It sounds like you want a side input with a trigger on it, if you want to
read it before you have all the data. This is highly nondeterministic so
you want to be sure that you do not require exact answers on the side input.

Kenn

On Tue, Jan 5, 2021 at 6:56 AM Manninger, Matyas <
matyas.mannin...@veolia.com> wrote:

> Dear Beam users,
>
> Can someone clarify me how side input works in streaming? If I use a
> stream as a side input to my main stream, each element will be paired with
> a side input from the according time window. does this mean that the
> element will not be processed until the appropriate window on the side
> input stream is closed? So if my side input is windowed into 24 hour
> windows will my elements from the main stream be processed only every 24
> hour? If not, then if the window is triggered for the sideinput at 12:00
> and the input actually only arrives at 12:05 then all elements from the
> main stream processed between 12:00 and 12:05 will be matched with an empty
> sideinput?
>
> Any clarification is appreciated.
>
> Best regards,
> Matyas
>


Re: Combine with multiple outputs case Sample and the rest

2021-01-05 Thread Kenneth Knowles
Perhaps something based on stateful DoFn so there is a simple decision
point at which each element is either sampled or not so it can be output to
one PCollection or the other. Without doing a little research, I don't
recall if this is doable in the way you need.

Kenn

On Wed, Dec 23, 2020 at 3:12 PM Ismaël Mejía  wrote:

> Thanks for the answer Robert. Producing a combiner with two lists as
> outputs was one idea I was considering too but I was afraid of
> OutOfMemory issues. I had not thought much about the consequences on
> combining state, thanks for pointing that. For the particular sampling
> use case it might be not an issue, or am I missing something?
>
> I am still curious if for Sampling there could be another approach to
> achieve the same goal of producing the same result (uniform sample +
> the rest) but without the issues of combining.
>
> On Mon, Dec 21, 2020 at 7:23 PM Robert Bradshaw 
> wrote:
> >
> > There are two ways to emit multiple outputs: either to multiple distinct
> PCollections (e.g. withOutputTags) or multiple (including 0) outputs to a
> single PCollection (the difference between Map and FlatMap). In full
> generality, one can always have a CombineFn that outputs lists (say  result>*) followed by a DoFn that emits to multiple places based on this
> result.
> >
> > One other cons of emitting multiple values from a CombineFn is that they
> are used in other contexts as well, e.g. combining state, and trying to
> make sense of a multi-outputting CombineFn in that context is trickier.
> >
> > Note that for Sample in particular, it works as a CombineFn because we
> throw most of the data away. If we kept most of the data, it likely
> wouldn't fit into one machine to do the final sampling. The idea of using a
> side input to filter after the fact should work well (unless there's
> duplicate elements, in which case you'd have to uniquify them somehow to
> filter out only the "right" copies).
> >
> > - Robert
> >
> >
> >
> > On Fri, Dec 18, 2020 at 8:20 AM Ismaël Mejía  wrote:
> >>
> >> I had a question today from one of our users about Beam’s Sample
> >> transform (a Combine with an internal top-like function to produce a
> >> uniform sample of size n of a PCollection). They wanted to obtain also
> >> the rest of the PCollection as an output (the non sampled elements).
> >>
> >> My suggestion was to use the sample (since it was little) as a side
> >> input and then reprocess the collection to filter its elements,
> >> however I wonder if this is the ‘best’ solution.
> >>
> >> I was thinking also if Combine is essentially GbK + ParDo why we don’t
> >> have a Combine function with multiple outputs (maybe an evolution of
> >> CombineWithContext). I know this sounds weird and I have probably not
> >> thought much about issues or the performance of the translation but I
> >> wanted to see what others thought, does this make sense, do you see
> >> some pros/cons or other ideas.
> >>
> >> Thanks,
> >> Ismaël
>


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Kenneth Knowles
I really think we should make a plan to make this the default. If you test
with the DirectRunner it will do mutation checking and catch pipelines that
depend on the runner cloning every element. (also the DirectRunner doesn't
clone). Since the cloning is similar in cost to the mutation detection,
could we actually add some mutation detection to FlinkRunner pipelines and
also directly warn if a pipeline is depending on it?

Kenn

On Mon, Dec 21, 2020 at 5:04 AM Teodor Spæren 
wrote:

> Hey! My option is not default as of now, since it can break pipelines
> which rely on the faulty flink implementation. I'm creating my own
> benchmarks locally and will run against those, but the idea of adding it
> to the official benchmark runs sounds interesting, thanks for bringing
> it up!
>
> Teodor
>
> On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote:
> >Hi Teodor,
> >
> >Thank you for working on this. If I remember correctly, there were some
> >opportunities to improve in the previous paper (e.g. not focusing
> >deprecated runners, long running benchmarks, varying data sizes). And I am
> >excited that you are keeping the community as part of your research
> process
> >and we will be happy to help you where we can.
> >
> >Related to your question. Was the new option used by default? If that
> >is the case you will probably see its impact on the metrics dashboard [1].
> >And if it is not on by default, you can add your variant as a new
> benchmark
> >and compare the difference across many runs in a controlled benchmarking
> >environment. Would that help?
> >
> >Ahmet
> >
> >[1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1
> >
> >
> >On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren 
> >wrote:
> >
> >> Hey!
> >>
> >> Yeah, that paper was what prompted my master thesis! I definitivly will
> >> post here, once I get more data :)
> >>
> >> Teodor
> >>
> >> On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:
> >> >Hi Teodor,
> >> >
> >> >Although I’m sure you’ve come across it, this might have some valuable
> >> resources or methodologies to consider as you explore this a bit more:
> >> >
> >> >https://arxiv.org/pdf/1907.08302.pdf
> >> >
> >> >I’m looking forward to reading about your finding, especially using a
> >> more recent iteration of Beam!
> >> >
> >> >Rion
> >> >
> >> >> On Dec 14, 2020, at 6:37 AM, Teodor Spæren <
> teodor_spae...@riseup.net>
> >> wrote:
> >> >>
> >> >> Just bumping this so people see it now that 2.26.0 is out :)
> >> >>
> >> >>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
> >> >>> Hey!
> >> >>>
> >> >>> My name is Teodor Spæren and I'm writing a master thesis
> investigating
> >> the performance overhead of using Beam instead of using the underlying
> >> systems directly. My focus has been on Flink and I've made a discovery
> >> about some unnecessary copying between operators in the Flink
> runner[1][2].
> >> I wrote a fixed for this and it got accepted and merged,
> >> >>> and will be in the upcoming 2.26.0 release[3].
> >> >>>
> >> >>> I'm writing this email to ask if anyone on these mailing lists would
> >> be willing to send me some result of applying this option when the new
> >> version of beam releases. Anything will be very much appreciated,
> stories,
> >> screenshots of performance monitoring before and after, hard numbers,
> >> anything! If you include the cluster size and the workload that would be
> >> awesome too! My master thesis is set to be complete the coming summer,
> so
> >> there is no real hurry :)
> >> >>>
> >> >>> The thesis will be freely accessible[4] and I hope that these
> findings
> >> will be of help to the beam community. If anyone wishes to submit
> stories,
> >> but remain anonymous that is also ok :)
> >> >>>
> >> >>> The best way to contact me would be to send an email my way here, or
> >> on teod...@mail.uio.no.
> >> >>>
> >> >>> Any help is appreciated, thanks for your attention!
> >> >>>
> >> >>> Best regards,
> >> >>> Teodor Spæren
> >> >>>
> >> >>>
> >> >>> [1]:
> >>
> https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
> >> >>> [2]: https://issues.apache.org/jira/browse/BEAM-11146
> >> >>> [3]: https://github.com/apache/beam/pull/13240
> >> >>> [4]: https://www.duo.uio.no/
> >>
>


Re: [ANNOUNCE] Beam 2.25.0 Released

2020-10-26 Thread Kenneth Knowles
Hooray! Thanks Robin!

On Mon, Oct 26, 2020 at 11:51 AM Rui Wang  wrote:

> Thank you Robin!
>
>
> -Rui
>
> On Mon, Oct 26, 2020 at 11:44 AM Pablo Estrada  wrote:
>
>> Thanks Robin!
>>
>> On Mon, Oct 26, 2020 at 11:06 AM Robin Qiu  wrote:
>>
>>> The Apache Beam team is pleased to announce the release of
>>> version 2.25.0.
>>>
>>> Apache Beam is an open source unified programming model to define and
>>> execute data processing pipelines, including ETL, batch and stream
>>> (continuous) processing. See: https://beam.apache.org
>>>
>>> You can download the release here:
>>> https://beam.apache.org/get-started/downloads/
>>>
>>> This release includes bug fixes, features, and improvements detailed on
>>> the Beam blog: https://beam.apache.org/blog/beam-2.25.0/
>>>
>>> Thanks to everyone who contributed to this release, and we hope you enjoy
>>> using Beam 2.25.0.
>>>
>>


Re: Which Solr versions should be supported by Beam

2020-10-23 Thread Kenneth Knowles
This might be a good question for u...@solr.apache.org and/or
d...@solr.apache.org, too.

Kenn

On Fri, Oct 23, 2020 at 6:24 AM Piotr Szuberski 
wrote:

> Beam has quite old Solr dependency (5.x.y) which has been deprecated for a
> long time.
>
> Solr dependency has recently been updated to 8.6.y, but there is a
> question which versions should be supported?
>
> Are there users using versions older than 7.x.y?
>


Re: Count based triggers and latency

2020-10-12 Thread Kenneth Knowles
Another thing to keep in mind - apologies if it was already clear:
triggering governs aggregation (GBK / Combine). It does not have any effect
on stateful DoFn.

On Mon, Oct 12, 2020 at 9:24 AM Luke Cwik  wrote:

> The default trigger will only fire when the global window closes which
> does happen with sources whose watermark goes > GlobalWindow.MAX_TIMESTAMP
> or during pipeline drain with partial results in streaming. Bounded sources
> commonly have their watermark advance to the end of time when they complete
> and some unbounded sources can stop producing output if they detect the end.
>
> Parallelization for stateful DoFns are per key and window. Parallelization
> for GBK is per key and window pane. Note that  elementCountAtLeast means
> that the runner can buffer as many as it wants and can decide to offer a
> low latency pipeline by triggering often or better throughput through the
> use of buffering.
>
>
>
> On Mon, Oct 12, 2020 at 8:22 AM KV 59  wrote:
>
>> Hi All,
>>
>> I'm building a pipeline to process events as they come and do not really
>> care about the event time and watermark. I'm more interested in not
>> discarding the events and reducing the latency. The downstream pipeline has
>> a stateful DoFn. I understand that the default window strategy is Global
>> Windows,. I did not completely understand the default trigger as per
>>
>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
>> it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case of
>> global window how does this work (there is no end of window)?.
>>
>> My source is Google PubSub and pipeline is running on Dataflow runner I
>> have defined my window transform as below
>>
>> input.apply(TRANSFORM_NAME, Window.into(new GlobalWindows())
>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>>
>>
>> A couple of questions
>>
>>1. Is triggering after each element inefficient in terms of
>>persistence(serialization) after each element and also parallelism
>>triggering after each looks like a serial execution?
>>2. How does Dataflow parallelize in such cases of triggers?
>>
>>
>> Thanks and appreciate the responses.
>>
>> Kishore
>>
>


Re: Support of per-key state after windowing

2020-08-23 Thread Kenneth Knowles
Yes :-)

On Sun, Aug 23, 2020 at 2:16 PM Reuven Lax  wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>
>>
>> .apply(Reify.windowsInValue()
>> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>>  // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>>>
>>>> You could simply window into GlobalWindows and add a stateful DoFn
>>>> afterwards. No need for the triggering and GroupByKey.
>>>>
>>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
>>>> wrote:
>>>>
>>>>> Hi Kenneth,
>>>>>
>>>>> According to your suggestion, I modified my pipeline as follows:
>>>>>
>>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>>  // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>>// (B)
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))//
>>>>>> (C)
>>>>>>   .apply(
>>>>>> Window
>>>>>>   .into(new GlobalWindows())
>>>>>>   // (E1)
>>>>>>   .triggering(
>>>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>>   )
>>>>>>   .accumulatingFiredPanes()
>>>>>>// (E3)
>>>>>>   )
>>>>>>   .apply(GroupByKey.create())
>>>>>>// (F)
>>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>>   // (D)
>>>>>
>>>>>
>>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>>> iterate over a list of output records from (C) sharing the same key.
>>>>> This way I can achieve the same effect without having a per-key state
>>>>> at (D).
>>>>>
>>>>> Do I understand your intention correctly?
>>>>> If not, please advise me with some hints on it.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>
>>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles 
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>>> pipeline looks like below:
>>>>>>>
>>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>>>>
>>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>>
>>>>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>>>>>
>>>>>>>
>>>>>>> What I want to do is
>>>>>>> (1) to group data by key (A) and window (B),
>>>>>>> (2) to do some aggregation (C)
>>>>>>> (3) to perform the final computation on each group (D)
>>>>>>>
>>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>>> windowing.
>>>>>>>
>>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>>> per-key state and per-key+window state [1].
>>>>>>>
>>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>>
>>>>>>
>>>>>> You understand correctly - Beam does not include per-key state that
>>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>>> achieve the same effect by copying the window metadata into the element 
>>>>>> and
>>>>>> then re-windowing into the global window before (D).
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dongwon
>>>>>>>
>>>>>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Kenneth Knowles
On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim  wrote:

> Hi Reuven,
>
> You and Kenneth are right; I thought GlobalWindows in unbounded streams
> need triggers.
>
> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
> a per-key state.
> Hope I understand you and Kenneth correctly this time.
>

That is correct. However, I think you may want:

p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))  // (E)


.apply(Reify.windowsInValue()
<https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
 // (G)


>   .apply(ParDo.of(new MyDoFn()))  // (D)


This will make the window information from (B) & (C) available to MyDoFn in
(D)

Kenn


>
> Best,
>
> Dongwon
>
> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>
>> You could simply window into GlobalWindows and add a stateful DoFn
>> afterwards. No need for the triggering and GroupByKey.
>>
>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
>> wrote:
>>
>>> Hi Kenneth,
>>>
>>> According to your suggestion, I modified my pipeline as follows:
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>// (A)
>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>  // (B)
>>>>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>>>>   .apply(
>>>> Window
>>>>   .into(new GlobalWindows())
>>>> // (E1)
>>>>   .triggering(
>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>   )
>>>>   .accumulatingFiredPanes()
>>>>  // (E3)
>>>>   )
>>>>   .apply(GroupByKey.create())
>>>>  // (F)
>>>>   .apply(ParDo.of(new MyDoFn()))
>>>> // (D)
>>>
>>>
>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>> iterate over a list of output records from (C) sharing the same key.
>>> This way I can achieve the same effect without having a per-key state at
>>> (D).
>>>
>>> Do I understand your intention correctly?
>>> If not, please advise me with some hints on it.
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>>
>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>> pipeline looks like below:
>>>>>
>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>>>
>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>
>>>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>>>
>>>>>
>>>>> What I want to do is
>>>>> (1) to group data by key (A) and window (B),
>>>>> (2) to do some aggregation (C)
>>>>> (3) to perform the final computation on each group (D)
>>>>>
>>>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>>>> new window for the key is arriving, which gives me a feeling that Beam
>>>>> seems to support only per-key+window state, not per-key state, after
>>>>> windowing.
>>>>>
>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>> per-key state and per-key+window state [1].
>>>>>
>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>> windowing (D)? If I miss something, please correct me.
>>>>>
>>>>
>>>> You understand correctly - Beam does not include per-key state that
>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>> achieve the same effect by copying the window metadata into the element and
>>>> then re-windowing into the global window before (D).
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>>


Re: Support of per-key state after windowing

2020-08-22 Thread Kenneth Knowles
Hi Dongwon,

On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim  wrote:

> Hi all,
>
> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
> looks like below:
>
>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>
>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> What I want to do is
> (1) to group data by key (A) and window (B),
> (2) to do some aggregation (C)
> (3) to perform the final computation on each group (D)
>
> I've noticed that a ValueState for a particular key is NULL whenever a new
> window for the key is arriving, which gives me a feeling that Beam seems to
> support only per-key+window state, not per-key state, after windowing.
>
> I usually work with Flink DataStream API and Flink supports both per-key
> state and per-key+window state [1].
>
> Does Beam support per-key states, not per-key+window states, after
> windowing (D)? If I miss something, please correct me.
>

You understand correctly - Beam does not include per-key state that crosses
window boundaries. If I understand your goal correctly, you can achieve the
same effect by copying the window metadata into the element and then
re-windowing into the global window before (D).

Kenn


>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> Best,
>
> Dongwon
>
>


Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-07-29 Thread Kenneth Knowles
Hi Mohil,

It helps also to tell us what version of Beam you are using and some more
details. This looks related to
https://issues.apache.org/jira/browse/BEAM-6855 which claims to be resolved
in 2.17.0

Kenn

On Mon, Jul 27, 2020 at 11:47 PM Mohil Khare  wrote:

> Hello all,
>
> I think I found the reason for the issue.  Since the exception was thrown
> by StreamingSideInputDoFnRunner.java, I realize that I recently added side
> input to one of my ParDo that does stateful transformations.
> It looks like there is some issue when you add side input (My side input
> was coming via Global window to ParDo in a Fixed Window) to stateful DoFn.
>
> As a work around, instead of adding side input to stateful ParDo, I
> introduced another ParDo  that enriches streaming data with side input
> before flowing into stateful DoFn. That seems to have fixed the problem.
>
>
> Thanks and regards
> Mohil
>
>
>
> On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:
>
>> Hello All,
>>
>> Any idea how to debug this and find out which stage, which DoFn or which
>> side input is causing the problem?
>> Do I need to override OnTimer with every DoFn to avoid this problem?
>> I thought that some uncaught exceptions were causing this and added
>> various checks and exception handling in all DoFn and still seeing this
>> issue.
>> It has been driving me nuts. And now forget DRAIN, it happens during
>> normal functioning as well. Any help would be appreciated.
>>
>> java.lang.UnsupportedOperationException: Attempt to deliver a timer to a
>> DoFn, but timers are not supported in Dataflow.
>>
>>1.
>>   1. at org.apache.beam.runners.dataflow.worker.
>>   StreamingSideInputDoFnRunner.onTimer (
>>   StreamingSideInputDoFnRunner.java:86
>>   
>> 
>>   )
>>   2. at org.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>>   
>> 
>>   )
>>   3. at org.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>>   
>> 
>>   )
>>   4. at org.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>>   
>> 
>>   )
>>   5. at org.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>>   
>> 
>>   )
>>   6. at org.apache.beam.runners.dataflow.worker.
>>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>>   
>> 
>>   )
>>   7. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>>   ParDoOperation.finish (ParDoOperation.java:52
>>   
>> 
>>   )
>>   8. at org.apache.beam.runners.dataflow.worker.util.common.worker.
>>   MapTaskExecutor.execute (MapTaskExecutor.java:85
>>   
>> 
>>   )
>>   9. at org.apache.beam.runners.dataflow.worker.
>>   StreamingDataflowWorker.process (StreamingDataflowWorker.java:1350
>>   
>> 
>>   )
>>   10. at org.apache.beam.runners.dataflow.worker.
>>   StreamingDataflowWorker.access$1100 (
>>   StreamingDataflowWorker.java:152
>>   
>> 

Re: DoFn Timer fire multiple times

2020-07-15 Thread Kenneth Knowles
Hello!

What runner are you using? Does this reproduce on multiple runners? (it is
very quick to try out your pipeline on DirectRunner and local versions of
open source runners like Flink, Spark, etc)

If you can produce a complete working reproduction it will be easier for
someone to debug. I do not see anything wrong with your code. I assumed you
got the `window` variable out of the ProcessContext\ (you can also make it
a parameter to @ProcessElement)

Kenn

On Wed, Jul 15, 2020 at 4:38 PM Zhiheng Huang  wrote:

> Hi,
>
> I am trying to set a timer at window expiration time for my use case and
> expect it to fire just once per key per window.
> But instead I observe that the onTimer() method gets called multiple times
> almost all the time.
>
> Here's the relevant code snippet:
>
> @TimerId(WIN_EXP)
> private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId(COUNTS)
> private final StateSpec>> counts =
> StateSpecs.value();
>
> @ProcessElement
> public void process(
> ProcessContext context,
> @StateId(COUNTS) ValueState>
> countsState,
> @TimerId(WIN_EXP) Timer winExpTimer) {
>
>   ...
>   Map counts = countsState.read();
>   if (counts == null) {
> counts = new HashMap<>();
> // Only place where I set the timer
>
> winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
>   }
>   ... // no return here and I do not observe exception in the pipeline
>   countsState.write(counts);
>   ...
> }
>
> I tried adding logs in OnTimer:
>
> String key = keyState.read();
> if (key != null && key.equals("xxx")) {
>   logger.error(String.format("fired for %s.",
> context.window().maxTimestamp().toDateTime()));
> }
>
> Output:
>
> E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
>
> Seems like this is not due to some contention, the first log and the last
> is ~1minute apart. BTW, my allowed
> lateness is also set to 1 minute.
>
> Anyone can let me know if I am missing something here? I am using beam
> 2.22 and dataflow runner.
>
> Thanks!
>
>


Re: How to import v2.20.0 to IntelliJ IDEA 2020.1.1?

2020-05-20 Thread Kenneth Knowles
This changed for me recently. I re-run my IntelliJ import regularly just to
make sure setting up does not depend on painful manual configuration. My
most recent re-run also ended up having no dependencies. I did not track
this down or file a bug because I did not have time to confirm it was not
just my problem.

Kenn

On Wed, May 20, 2020 at 7:22 AM Omar Ismail  wrote:

> Hey Jacek,
>
> What I usually do is `git clone REPO` in a new folder, and then in the
> Intellij main menu I press "Open", navigate to that folder, then press
> "Open". Gradle then automatically starts doing the build.
>
> Hope this helps!
>
> Best,
> Omar
>
> On Wed, May 20, 2020 at 10:06 AM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> I've been trying to import the Beam sources to IntelliJ IDEA for some
>> time now and must admit I'm very surprised how painful it is.
>>
>> I tried the steps described in
>> https://cwiki.apache.org/confluence/display/BEAM/Set+up+IntelliJ+from+scratch,
>> but the wizards give me different options (perhaps I'm on a newer IDEA or
>> could it be attributed to macOS?)
>>
>> Import is successful, but the modules don't seem to have dependencies
>> specified and SparkRunner has all org.apache.beam.sdk. imports unresolved
>> (Cannot resolve the symbol 'XXX').
>>
>> I'm sure I'm missing something obvious and simple, but seems it's too
>> obvious and too simple for me today :( Thanks for any hints.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>
>
> --
>
> Omar Ismail |  Technical Solutions Engineer |  omarism...@google.com |
>


Re: Pattern for sharing a resource across all worker threads in Beam Java SDK

2020-05-01 Thread Kenneth Knowles
On Fri, May 1, 2020 at 2:01 PM Luke Cwik  wrote:

> All worker threads are going to see the same instance.
> In Dataflow there is only one JVM containing user code per VM instance.
>
> One of the reasons around the portability effort is having the worker
> nodes/JVMs be consistent across runners so users have an easier time to
> choose the runner that works best for them and not get snagged on runner
> environment differences.
>

I agree with Luke's main point - our runners have pretty wildly different
infrastructural details that make it hard to just switch between them, and
portability improves that. But I would also caution against taking this too
far, especially for this inquiry. The solution to your problem here really
shouldn't require much about the environment. Please don't make assumptions
about threads per JVM, or JVMs per VM.

Portability's predictable environments will undoubtedly make it easy to
depend on these and other things that are subject to change/breakage
without notice. Differences aren't just across runners, but across time and
codepaths.

Kenn



> On Fri, May 1, 2020 at 7:01 AM Jeff Klukas  wrote:
>
>> Cameron's point about protecting return values is quite reasonable. I
>> agree it would be good to make sure we're returning an returning an
>> unmodifiable set in my getOrCreateSingletonAllowedCities case.
>>
>> Luke's suggestion of Supplier.memoize sounds like a more elegant way of
>> doing essentially the same thing we're currently implementing, so
>> definitely worth looking into.
>>
>> The PipelineOptions-based solution is also intriguing. When we pass
>> values into transforms from PipelineOptions, are all worker threads going
>> to see the same instance? Or does PipelineOptions end up getting serialized
>> and reconstructed by each thread? I see that your example uses JsonIgnore
>> to prevent a computed value being serialized, but it's unclear whether the
>> default value will be generated once per thread or once per worker.
>>
>> I suppose the Beam model doesn't make any statement about how runners
>> handle multiple threads per worker, so there's no guarantee about whether
>> multiple threads are executing in the same JVM or not. Am I correct in
>> assuming at least in the Dataflow case that there is one JVM per worker
>> node?
>>
>> On Thu, Apr 30, 2020 at 7:05 PM Luke Cwik  wrote:
>>
>>> I looked at your example and the custom logic for the singleton is
>>> basically:
>>> static transient T value;
>>> public static synchronized getOrCreate(...) {
>>>   if (value == null) {
>>>  ... instantiate value ...
>>>   }
>>>   return value;
>>> }
>>>
>>> Which is only a few lines. You could use one of Java's injection
>>> frameworks like Spring or Guice or ... as they commonly provide you a
>>> singleton pattern but the complexity overhead a lot of the time isn't worth
>>> it.
>>>
>>> If all your logic can be provided by PipelineOptions you could combine a
>>> DefaultValueFactory and a PipelineOption that returns T. First time an
>>> unset value is accessed the PipelineOption will be created and the
>>> synchronization is provided for you by PipelineOptions. It would look like:
>>>
>>> public interface MyPipelineOptions extends PipelineOptions {
>>>   @Default.InstanceFactory(TFactory.class);
>>>   @JsonIgnore  // prevents the value from being saved and makes it
>>> "local" to the process
>>>   T getT();
>>>   void setT(T t);
>>>
>>>   public static class TFactory implements DefaultValueFactory {
>>> public T create(PipelineOptions options) {
>>>   ... instantiate value ...
>>> }
>>>   }
>>> }
>>> and then in your DoFn you access your PipelineOption as normal. This is
>>> also convenient because you can set the value during testing of your DoFn.
>>>
>>> Another alternative would be to look for something like Guavas
>>> memoize[1] functions and use them as they are very lightweight.
>>>
>>> 1:
>>> https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier)
>>>
>>> On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas 
>>> wrote:
>>>
 Beam Java users,

 I've run into a few cases where I want to present a single thread-safe
 data structure to all threads on a worker, and I end up writing a good bit
 of custom code each time involving a synchronized method that handles
 creating the resource exactly once, and then each thread has its own
 reference to the singleton. I don't have extensive experience with thread
 safety in Java, so it seems likely I'm going to get this wrong.

 Are there any best practices for state that is shared across threads?
 Any prior art I can read up on?

 The most concrete case I have in mind is loading a GeoIP database for
 doing city lookups from IP addresses. We're using MaxMind's API which
 allows mapping a portion of memory to a file sitting on disk. We have a
 synchronized method that checks if the reader has been initialized 

Re: Notifying the closure of a Window Period

2020-05-01 Thread Kenneth Knowles
Ah yes...


On Fri, May 1, 2020 at 8:30 AM Truebody, Kyle  wrote:

> Yes data will be keyed by shard.
>
>
>
> This is the trigger config we used:
>
>
>
> WindowFileNamePolicy policy = new
> WindowFileNamePolicy(prefix,options.getDataSource());
>
>
>
> TextIO.Write textWriter = TextIO.write()
>
> .to(policy)
>
> .withTempDirectory(tempPrefix)
>
> .withWindowedWrites()
>
> .withNumShards(options.getShardCount());
>
>
>
>   batchCollection = batchCollection.apply("Fixed
> Strategy",Window.into(
>
>
> FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration(
>
> .triggering(AfterWatermark.pastEndOfWindow())
>

This trigger will close the window immediately and drop data. You probably
want .trigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) or
.trigger(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1))).
These have the same behavior.

In more recent SDKs this pipeline should be rejected to protect you from
data loss caused by the trigger "closing" and dropping data. What version
of SDK are you using?

Kenn



>
> .withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness()))
>
> .discardingFiredPanes()).apply(textWriter);
>
>
>
>
>
>
>
> *From:* Kenneth Knowles 
> *Sent:* Friday, May 1, 2020 4:19 PM
> *To:* user 
> *Subject:* Re: Notifying the closure of a Window Period
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Is the data keyed by shardNumber? To have a unique final pane for a
> filename prefix, you will need to include the key in the prefix.
>
>
>
> Can you also provide the triggering configuration you are working with?
>
>
>
> Kenn
>
>
>
> On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle  wrote:
>
> Hi Kenn,
>
>
>
> Thanks for the response…
>
> Not sure if  I under this correctly : ‘affected by the fact that windows
> processed independently for each key’
>
> I put a high level example below, hope it clarifies what I am trying to
> ask.
>
> Is there more precise way we can get informed of the final pane of a
> window session has been written completely.
>
> Due to nature of coordination set up for downstream consumers, the
> .trigger file delivery needs to be on the completion of the absolute last
> pane.
>
>
>
> ```
>
> public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy  {
>
>
>
> private final ResourceId prefix;
>
>
>
> private final String dataSource;
>
>
>
> /**
>
>  *  file names - file source name
>
>  * - timestamp (processing timestamp / event timestamp)
> Based on the current time window
>
>  * - optional : - shard number
>
>  *  - window start ts
>
>  * @param prefix
>
>  */
>
> public WindowFileNamePolicy(ResourceId prefix,String dataSource){
>
> this.prefix = prefix;
>
> this.dataSource  = dataSource;
>
> }
>
>
>
>
>
> public String filenamePrefixForWindow(IntervalWindow window) {
>
> String filePrefix = prefix.isDirectory() ? "" :
> prefix.getFilename();
>
>
>
> DateTimeFormatter formatter =
> DateTimeFormat.forPattern(Utilities.lngDateFormat);
>
> DateTime windowStart =
> formatter.parseDateTime(window.start().toString());
>
>
>
> DateTimeFormatter resultformat =
> DateTimeFormat.forPattern(Utilities.shtDateFormat);
>
>
>
> return String.format(
>
> "%s/%s/%s-%s", resultformat.print(windowStart),
> dataSource, dataSource, resultformat.print(windowStart));
>
> }
>
>
>
> @Override
>
> public ResourceId windowedFilename(int shardNumber, int numShards,
> BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints
> outputFileHints) {
>
> IntervalWindow intervalWindow = (IntervalWindow) window;
>
> String filename =
>
> String.format(
>
> "%s-%s-%s",
>
> filenamePrefixForWindow(intervalWindow),
>
> shardNumber,
>
> numShards);
>
>
>
> if(paneInfo.isLast())
>
> createTriggerFile(/*tigger file name*/ ".trigger");  *//writes
> to 

Re: Non-trivial joins examples

2020-05-01 Thread Kenneth Knowles
+dev @beam and some people who I talk about joins with

Interesting! It is a lot to take in and fully grok the code, so calling in
reinforcements...

Generally, I think there's agreement that for a lot of real use cases, you
have to roll your own join using the lower level Beam primitives. So I
think it would be great to get some of these other approaches to joins into
Beam, perhaps as an extension of the Java SDK or even in the core (since
schema joins are in the core). In particular:

 - "join in fixed window with repeater" sounds similar (but not identical)
to work by Mikhail
 - "join in global window with cache" sounds similar (but not identical) to
work and discussions w/ Reza and Tyson

I want to be clear that I am *not* saying there's any duplication. I'm
guessing these all fit into a collection of different ways to accomplish
joins, and if everything comes to fruition we will have the great
opportunity to document how a user should choose between them.

Kenn

On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan 
wrote:

> Hi,
>
> it's my first post here but I'm a group reader for a while, so thank you
> for sharing the knowledge!
>
> I've been using Beam/Scio on Dataflow for about a year, mostly for stream
> processing from unbounded source like PubSub. During my daily work I found
> that built-in windowing is very generic and provides reach watermark/late
> events semantics but there are a few very annoying limitations, e.g:
> - both side of the join must be defined within compatible windows
> - for fixed windows, elements close to window boundaries (but in different
> windows) won't be joined
> - for sliding windows there is a huge overhead if the duration is much
> longer than offset
>
> I would like to ask you to review a few "join/windowing patterns" with
> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
> crafted for more specific needs. I published code with tests, feel free to
> comment as GitHub issues or on the mailing list. The event time processing
> with watermarks is so demanding that I'm almost sure that I overlooked many
> important corner cases.
> https://github.com/mkuthan/beam-examples
>
> If you think that the examples are somehow useful I'll be glad to write
> blog post with more details :)
>
> Marcin
>


Re: Notifying the closure of a Window Period

2020-05-01 Thread Kenneth Knowles
Is the data keyed by shardNumber? To have a unique final pane for a
filename prefix, you will need to include the key in the prefix.

Can you also provide the triggering configuration you are working with?

Kenn

On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle  wrote:

> Hi Kenn,
>
>
>
> Thanks for the response…
>
> Not sure if  I under this correctly : ‘affected by the fact that windows
> processed independently for each key’
>
> I put a high level example below, hope it clarifies what I am trying to
> ask.
>
> Is there more precise way we can get informed of the final pane of a
> window session has been written completely.
>
> Due to nature of coordination set up for downstream consumers, the
> .trigger file delivery needs to be on the completion of the absolute last
> pane.
>
>
>
> ```
>
> public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy  {
>
>
>
> private final ResourceId prefix;
>
>
>
> private final String dataSource;
>
>
>
> /**
>
>  *  file names - file source name
>
>  * - timestamp (processing timestamp / event timestamp)
> Based on the current time window
>
>  * - optional : - shard number
>
>  *  - window start ts
>
>  * @param prefix
>
>  */
>
> public WindowFileNamePolicy(ResourceId prefix,String dataSource){
>
> this.prefix = prefix;
>
> this.dataSource  = dataSource;
>
> }
>
>
>
>
>
> public String filenamePrefixForWindow(IntervalWindow window) {
>
> String filePrefix = prefix.isDirectory() ? "" :
> prefix.getFilename();
>
>
>
> DateTimeFormatter formatter =
> DateTimeFormat.forPattern(Utilities.lngDateFormat);
>
> DateTime windowStart =
> formatter.parseDateTime(window.start().toString());
>
>
>
> DateTimeFormatter resultformat =
> DateTimeFormat.forPattern(Utilities.shtDateFormat);
>
>
>
> return String.format(
>
> "%s/%s/%s-%s", resultformat.print(windowStart),
> dataSource, dataSource, resultformat.print(windowStart));
>
> }
>
>
>
> @Override
>
> public ResourceId windowedFilename(int shardNumber, int numShards,
> BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints
> outputFileHints) {
>
> IntervalWindow intervalWindow = (IntervalWindow) window;
>
> String filename =
>
> String.format(
>
> "%s-%s-%s",
>
> filenamePrefixForWindow(intervalWindow),
>
> shardNumber,
>
> numShards);
>
>
>
> if(paneInfo.isLast())
>
> createTriggerFile(/*tigger file name*/ ".trigger");  *//writes
> to the same directory of the current window.  This fires multiple time
> depending on the number of panes that have isLast() is true/ or write
> operators (not sure exactly).*
>
> return prefix.getCurrentDirectory().resolve(filename,
> ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
>
> }
>
> ```
>
>
>
> Thanks,
>
> Kyle
>
>
>
> *From:* Kenneth Knowles 
> *Sent:* Friday, May 1, 2020 2:25 PM
> *To:* user 
> *Subject:* Re: Notifying the closure of a Window Period
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> I am guessing you will be affected by the fact that windows processed
> independently for each key. Is that what you are referring to when you
> mention multiple isLast() windows?
>
>
>
> Kenn
>
>
>
> On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle  wrote:
>
> Hi all,
>
>
>
> We are working on a streaming pipeline that we need to compatible with out
> legacy platform while we make the move over to Beam Streaming.
>
> Our legacy platform uses a co-ordination framework (oozie). Each step is
> in the coordination pipeline is active by the creation of  a trigger file.
>
>
>
> I am looking for a beam construct or flag that will notify the Context/
> driver of the closure of a Time window. We need to enable to create a
> trigger flag only when all the files have been emitted
>
> from set window period.
>
>
>
> We have tried creating the trigger flag using the  PaneInfo.isLast()
> through a custom WindowFileNamePolicy. Noticed that a window has multiple
> Panes that will have isLast() as true.
>
>
>
> Thanks,
>
> Kyle
>
>


Re: Notifying the closure of a Window Period

2020-05-01 Thread Kenneth Knowles
I am guessing you will be affected by the fact that windows processed
independently for each key. Is that what you are referring to when you
mention multiple isLast() windows?

Kenn

On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle  wrote:

> Hi all,
>
>
>
> We are working on a streaming pipeline that we need to compatible with out
> legacy platform while we make the move over to Beam Streaming.
>
> Our legacy platform uses a co-ordination framework (oozie). Each step is
> in the coordination pipeline is active by the creation of  a trigger file.
>
>
>
> I am looking for a beam construct or flag that will notify the Context/
> driver of the closure of a Time window. We need to enable to create a
> trigger flag only when all the files have been emitted
>
> from set window period.
>
>
>
> We have tried creating the trigger flag using the  PaneInfo.isLast()
> through a custom WindowFileNamePolicy. Noticed that a window has multiple
> Panes that will have isLast() as true.
>
>
>
> Thanks,
>
> Kyle
>


Re: Running NexMark Tests

2020-04-21 Thread Kenneth Knowles
We should always want to shut down sources on final watermark. All incoming
data should be dropped anyhow.

Kenn

On Tue, Apr 21, 2020 at 1:34 PM Luke Cwik  wrote:

> +dev
>
> When would we not want --shutdownSourcesOnFinalWatermark=true ?
>
> On Tue, Apr 21, 2020 at 1:22 PM Ismaël Mejía  wrote:
>
>> You need to instruct the Flink runner to shutdown the the source
>> otherwise it will stay waiting.
>> You can this by adding the extra
>> argument`--shutdownSourcesOnFinalWatermark=true`
>> And if that works and you want to open a PR to update our
>> documentation that would be greatly appreciated.
>>
>> Regards,
>> Ismaël
>>
>>
>> On Tue, Apr 21, 2020 at 10:04 PM Sruthi Sree Kumar
>>  wrote:
>> >
>> > Hello,
>> >
>> > I am trying to run nexmark queries using flink runner streaming.
>> Followed the documentation and used the command
>> > ./gradlew :sdks:java:testing:nexmark:run \
>> >
>> > -Pnexmark.runner=":runners:flink:1.10" \
>> > -Pnexmark.args="
>> > --runner=FlinkRunner
>> > --suite=SMOKE
>> > --streamTimeout=60
>> > --streaming=true
>> > --manageResources=false
>> > --monitorJobs=true
>> > --flinkMaster=[local]"
>> >
>> >
>> > But after the events are read from the source, there is no further
>> progress and the job is always stuck at 99%. Is there any configuration
>> that I am missing?
>> >
>> > Regards,
>> > Sruthi
>>
>


Re: Recommended Reading for Apache Beam

2020-04-21 Thread Kenneth Knowles
I believe Streaming Systems is the most Beam-oriented book available.

Kenn

On Mon, Apr 20, 2020 at 3:07 PM Rion Williams  wrote:

> Hi all,
>
> I posed this question over on the Apache Slack Community however didn't
> get much of a response so I thought I'd reach out here. I've been looking
> for some additional resources, specifically books, surrounding Apache Beam,
> the Beam Model, etc. and was wondering if anyone had any recommendations?
>
> I've read a few books related to streaming in general with Streaming
> Systems (http://streamingsystems.net/) being the most Beam-oriented and
> since I've found myself using it more and more, I'd love to see if any
> other folks out there had some other suggestions on the book-front, if one
> exists.
>
> Thanks all,
>
> Rion
>


Re: Distributed Tracing in Apache Beam

2020-04-21 Thread Kenneth Knowles
+dev 

I don't have a ton of time to dig in to this, but I wanted to say that this
is very cool and just drop a couple pointers (which you may already know
about) like Explaining Outputs in Modern Data Analytics [1] which was
covered by The Morning Paper [2]. This just happens to be something I read
a few years ago - following citations of the paper or pingbacks on the blog
post yields a lot more work, some of which may be helpful. There seems to
be a slight difference of emphasis between tracing in an arbitrary
distributed system versus explaining big data results. I would expect
general tracing (which Jaeger is?) to be more complex and expensive to run,
but that's just an intuition.

Kenn

[1] http://www.vldb.org/pvldb/vol9/p1137-chothia.pdf
[2]
https://blog.acolyer.org/2017/02/01/explaining-outputs-in-modern-data-analytics/

On Fri, Apr 17, 2020 at 10:56 AM Rion Williams 
wrote:

> Hi Alexey,
>
> I think you’re right about the wrapper, it’s likely unnecessary as I think
> I’d have enough information in the headers to rehydrate my “tracer” that
> communicates the traces/spans to Jaeger as needed. I’d love to not have to
> touch those or muddy the waters with a wrapper class, additional conversion
> steps, custom coder, etc.
>
> Speaking of conversions, I agree entirely with the unified interface for
> reading/writing to Kafka. I’ll openly admit I spent far too long fighting
> with it before discovering that `withoutMetadata()` existed. So if those
> were unified and writeRecords could accept a Kafka one, that’d be great.
>
>
> > On Apr 17, 2020, at 12:47 PM, Alexey Romanenko 
> wrote:
> >
> > Hi Rion,
> >
> > In general, yes, it sounds reasonable to me. I just do not see why you
> need to have extra Traceable wrapper? Do you need to keep some temporary
> information there that you don’t want to store in Kafka record headers?
> >
> > PS: Now I started to think that we probably have to change an interface
> of KafkaIO.writeRecords() from ProducerRecord to the same KafkaRecord as we
> use for read. In this case we won’t expose Kafka API and use only own
> wrapper.
> > Also, user won’t need to convert types between Read and Write (like in
> this topic case).
> >
> >> On 17 Apr 2020, at 19:28, Rion Williams  wrote:
> >>
> >> Hi Alexey,
> >>
> >> So this is currently the approach that I'm taking. Basically creating a
> wrapper Traceable class that will contain all of my record information
> as well as the data necessary to update the traces for that record. It
> requires an extra step and will likely mean persisting something along side
> each record as it comes in, but I'm not sure if there's another way around
> it.
> >>
> >> My current approach goes something like this:
> >> - Read records via KafkaIO (with metadata)
> >> - Apply a transform to convert all KafkaRecord into Traceable
> instances (which just contain a Tracer object as well as the original
> KV record itself)
> >> - Pass this Traceable through all of the appropriate transforms,
> creating new spans for the trace as necessary via the tracer element on the
> Traceable object.
> >> - Prior to output to a Kafka topic, transform the Traceable
> object in to a ProducerRecord that contains the key, value, and
> headers (from the tracer) prior to writing back to Kafka
> >>
> >> I think this will work, but it'll likely take quite a bit of
> experimentation to verify. Does this sound reasonable?
> >>
> >> Thanks,
> >>
> >> Rion
> >>
> >>> On 2020/04/17 17:14:58, Alexey Romanenko 
> wrote:
> >>> Not sure if it will help, but KafkaIO allows to keep all meta
> information while reading (using KafkaRecord) and writing (using
> ProducerRecord).
> >>> So, you can keep your tracing id in the record headers as you did with
> Kafka Streams.
> >>>
>  On 17 Apr 2020, at 18:58, Rion Williams 
> wrote:
> 
>  Hi Alex,
> 
>  As mentioned before, I'm in the process of migrating a pipeline of
> several Kafka Streams applications over to Apache Beam and I'm hoping to
> leverage the tracing infrastructure that I had established using Jaeger
> whenever I can, but specifically to trace an element as it flows through a
> pipeline or potentially multiple pipelines.
> 
>  An example might go something like this:
> 
>  - An event is produced from some service and sent to a Kafka Topic
> (with a tracing id in the headers)
>  - The event enters my pipeline (Beam reads from that topic) and
> begins applying a series of transforms that evaluate the element itself
> (e.g. does it have any users associated with it, IP addresses, other
> interesting information).
>  - When interesting information is encountered on the element (or
> errors), I'd like to be able to associate them with the trace (e.g. a user
> was found, this is some information about the user, this is the unique
> identifier associated with them, or there was an error because the user had
> a malformed e-mail address)
>  - The traces themselves would be cumulative, so if 

Re: Global Window

2020-04-15 Thread Kenneth Knowles
In batch, with all bounded data, processing time timers are typically not
processed. This is because the window is first fully processed and expired.

Can you explain a bit more about why you want a processing time timer in
your use case?

Kenn

On Wed, Apr 15, 2020 at 9:41 PM Aniruddh Sharma 
wrote:

> Hi
>
> I am doing some stateful and timely processing.
>
> My use case  : I have a batch, so its a Global window. I do not want to
> use Window (as I have some functional joins) downstream. I want to use
> Global window only and do batched RPC.  I am setting stale timer (based on
> processing time), but that timer is never executed.
>
> Is it possible that global window expires before processing timer is set ?
>
> If yes,
>  how can I change expiry time of a Global window ?
>
> If no,
> could you please advise what could help.
>


Re: Using Self signed root ca for https connection in eleasticsearchIO

2020-04-07 Thread Kenneth Knowles
Hi Mohil,

Thanks for the detailed report. I think most people are reduced capacity
right now. Filing a Jira would be helpful for tracking this.

Since I am writing, I will add a quick guess, but we should move to Jira.
It seems this has more to do with Dataflow than ElasticSearch. The default
for staging files is to scan the classpath. To do more, or to fix any
problem with the autodetection, you will need to specify --filesToStage on
the command line or setFilesToStage in Java code. Am I correct that this
symptom is confirmed?

Kenn

On Mon, Apr 6, 2020 at 5:04 PM Mohil Khare  wrote:

> Any update on this? Shall I open a jira for this support ?
>
> Thanks and regards
> Mohil
>
> On Sun, Mar 22, 2020 at 9:36 PM Mohil Khare  wrote:
>
>> Hi,
>> This is Mohil from Prosimo, a small bay area based stealth mode startup.
>> We use Beam (on version 2.19) with google dataflow in our analytics
>> pipeline with Kafka and PubSub as source while GCS, BigQuery and
>> ElasticSearch as our sink.
>>
>> We want to use our private self signed root ca for tls connections
>> between our internal services viz kafka, ElasticSearch, beam etc. We are
>> able to setup secure tls connection between beam and kafka using self
>> signed root certificate in keystore.jks and truststore.jks and transferring
>> it to worker VMs running kafkaIO using KafkaIO's read via
>> withConsumerFactorFn().
>>
>> We want to do similar things with elasticseachIO where we want to update
>> its worker VM's truststore with our self signed root certificate so that
>> when elasticsearchIO connects using HTTPS, it can connect successfully
>> without ssl handshake failure. Currently we couldn't find any way to do so
>> with ElasticsearchIO. We tried various possible workarounds like:
>>
>> 1. Trying JvmInitializer to initialise Jvm with truststore using
>> System.setproperty for javax.net.ssl.trustStore,
>> 2. Transferring our jar to GCP's appengine where we start jar using
>> Djavax.net.ssl.trustStore and then triggering beam job from there.
>> 3. Setting elasticsearchIO flag withTrustSelfSigned to true (I don't
>> think it will work because looking at the source code, it looks like it has
>> dependency with keystorePath)
>>
>> But nothing worked. When we logged in to worker VMs, it looked like our
>> trustStore never made it to worker VM. All elasticsearchIO connections
>> failed with the following exception:
>>
>> sun.security.validator.ValidatorException: PKIX path building failed:
>> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
>> valid certification path to requested target
>> sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>>
>>
>> Right now to unblock ourselves, we have added proxy with letsencrypt root
>> ca between beam and Elasticsearch cluster and beam's elasticsearchIO
>> connect successfully to proxy using letsencrypt root certificate. We won't
>> want to use Letsencrypt root certificater for internal services as it
>> expires every three months.  Is there a way, just like kafkaIO, to use
>> selfsigned root certificate with elasticsearchIO? Or is there a way to
>> update java cacerts on worker VMs where beam job is running?
>>
>> Looking forward for some suggestions soon.
>>
>> Thanks and Regards
>> Mohil Khare
>>
>


Re: A new reworked Elasticsearch 7+ IO module

2020-03-06 Thread Kenneth Knowles
Since the user provides backendVersion, here are some possible levels of
things to add in expand() based on that (these are extra niceties beyond
the agreed number of releases to remove)

 - WARN for backendVersion < n
 - reject for backendVersion < n with opt-in pipeline option to keep it
working one more version (gets their attention and indicates urgency)
 - reject completely

Kenn

On Fri, Mar 6, 2020 at 2:26 AM Etienne Chauchot 
wrote:

> Hi all,
>
> it's been 3 weeks since the survey on ES versions the users use.
>
> The survey received very few responses: only 9 responses for now (multiple
> versions possible of course). The responses are the following:
>
> ES2: 0 clients, ES5: 1, ES6: 5, ES7: 8
>
> It tends to go toward a drop of ES2 support but for now it is still not
> very representative.
>
> I'm cross-posting to @users to let you know that I'm closing the survey
> within 1 or 2 weeks. So please respond if you're using ESIO.
>
> Best
>
> Etienne
> On 13/02/2020 12:37, Etienne Chauchot wrote:
>
> Hi Cham, thanks for your comments !
>
> I just sent an email to user ML with a survey link to count ES uses per
> version:
>
>
> https://lists.apache.org/thread.html/rc8185afb8af86a2a032909c13f569e18bd89e75a5839894d5b5d4082%40%3Cuser.beam.apache.org%3E
>
> Best
>
> Etienne
> On 10/02/2020 19:46, Chamikara Jayalath wrote:
>
>
>
> On Thu, Feb 6, 2020 at 8:13 AM Etienne Chauchot 
> wrote:
>
>> Hi,
>>
>> please see my comments inline
>> On 06/02/2020 16:24, Alexey Romanenko wrote:
>>
>> Please, see my comments inline.
>>
>> On 6 Feb 2020, at 10:50, Etienne Chauchot  wrote:
>>
>> 1. regarding version support: ES v2 is no more maintained by Elastic
 since 2018/02 so we plan to remove it from the IO. In the past we already
 retired versions (like spark 1.6 for instance).


>>> My only concern here is that there might be users who use the existing
>>> module who might not be able to easily upgrade the Beam version if we
>>> remove it. But given that V2 is 5 versions behind the latest release this
>>> might be OK.
>>>
>>
>> It seems we have a consensus on this.
>> I think there should be another general discussion on the long term
>> support of our prefered tool IO modules.
>>
>> => yes, consensus, let's drop ESV2
>>
>> We had (and still have) a similar problem with KafkaIO to support
>> different versions of Kafka, especially very old version 0.9. We raised
>> this question on user@ and it appears that there are users who for some
>> reasons still use old Kafka versions. So, before dropping a support of any
>> ES versions, I’d suggest to ask it user@ and see if any people will be
>> affected by this.
>>
>> Yes we can do a survey among users but the question is, should we support
>> an ES version that is no more supported by Elastic themselves ?
>>
>
> +1 for asking in the user list. I guess this is more about whether users
> need this specific version that we hope to drop support for. Whether we
> need to support unsupported versions is a more generic question that should
> prob. be addressed in the dev list. (and I personally don't think we should
> unless there's a large enough user base for a given version).
>
> 2. regarding the user: the aim is to unlock some new features (listed by
 Ludovic) and give the user more flexibility on his request. For that, it
 requires to use high level java ES client in place of the low level REST
 client (that was used because it is the only one compatible with all ES
 versions). We plan to replace the API (json document in and out) by more
 complete standard ES objects that contain de request logic (insert/update,
 doc routing etc...) and the data. There are already IOs like SpannerIO that
 use similar objects in input PCollection rather than pure POJOs.


>>> Won't this be a breaking change for all users ? IMO using POJOs in
>>> PCollections is safer since we have to worry about changes to the
>>> underlying client library API. Exception would be when underlying client
>>> library offers a backwards compatibility guarantee that we can rely on for
>>> the foreseeable future (for example, BQ TableRow).
>>>
>>
>> Agreed but actually, there will be POJOs in order to abstract
>> Elasticsearch's version support. The following third point explains this.
>>
>> => indeed it will be a breaking change, hence this email to get a
>> consensus on that. Also I think our wrappers of ES request objects will
>> offer a backward compatible as the underlying objects
>>
>> I just want to remind that according to what we agreed some time ago on
>> dev@ (at least, for IOs), all breaking user API changes have to be added
>> along with deprecation of old API that could be removed after 3 consecutive
>> Beam releases. In this case, users will have a time to move to new API
>> smoothly.
>>
>> We are more discussing the target architecture of the new module here but
>> the process of deprecation is important to recall, I agree. When I say DTOs
>> 

Re: GCS numShards doubt

2020-03-02 Thread Kenneth Knowles
For bounded data, each bundle becomes a file:
https://github.com/apache/beam/blob/da9e17288e8473925674a4691d9e86252e67d7d7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L356

Kenn

On Mon, Mar 2, 2020 at 6:18 PM Kyle Weaver  wrote:

> As Luke and Robert indicated, unsetting num shards _may_ cause the runner
> to optimize it automatically.
>
> For example, the Flink [1] and Dataflow [2] runners override num shards.
>
> However, in the Spark runner, I don't see any such override. So I have two
> questions:
> 1. Does the Spark runner override num shards somehow?
> 2. How is num shards determined if it's set to 0 and not overridden by the
> runner?
>
> [1]
> https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L240-L243
> [2]
> https://github.com/apache/beam/blob/a149b6b040e9573e53cd41b6bd69b7e7603ac2a2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1853-L1866
>
> On Fri, Feb 14, 2020 at 10:09 AM Robert Bradshaw 
> wrote:
>
>> To let Dataflow choose the optimal number shards and maximize
>> performance, it's often significantly better to simply leave it
>> unspecified. A higher numShards only helps if you have at least that
>> many workers.
>>
>> On Thu, Feb 13, 2020 at 10:24 PM vivek chaurasiya 
>> wrote:
>> >
>> > hi folks, I have this in code
>> >
>> > globalIndexJson.apply("GCSOutput",
>> TextIO.write().to(fullGCSPath).withSuffix(".txt").withNumShards(500));
>> >
>> > the same code is executed for 50GB, 3TB, 5TB of data. I want to know if
>> changing numShards for larger datasize will write to GCS faster?
>>
>


Re: Implementing custom session with max event/element count

2020-02-12 Thread Kenneth Knowles
I notice that you use the name "IntervalWindow" but you are calling methods
that IntervalWindow does not have. Do you have a custom implementation of
this class? Do you have a custom coder for your version of IntervalWindow?

Kenn

On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora  wrote:

> Hi Everyone,
>
> I am trying to implement session on events with three criteria
> 1. Gap Duration - eg. 10 mins
> 2. Max duration - eg. 1 hour
> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
> implementing a custom BoundedWindow keeping track of window size and max
> duration. But I’m having difficulty implementing 3rd criteria which is - a
> session should have maximum number of events.I’m trying to implement this
> by tracking number of events in a window but while testing I noticed that
> mergeWindows is called every 3 seconds and after mergeWindows is executed,
> windows in that merge is lost, so is the metadata of number of events seen
> in that window.Any example of pointers would be helpful on how to
> implement a session with max element/event count. Below is the code I
> implemented a custom WindowFn:
>
> public class UserSessions extends WindowFn, IntervalWindow> 
> {
>   private final Duration gapDuration;
>   private Duration maxSize;
>   private static final Duration DEFAULT_SIZE_DURATION = 
> Duration.standardHours(12L);
>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
> this.gapDuration = gapDuration;
> this.maxSize = sizeDuration;
>   }
>   public static UserSessions withGapDuration(Duration gapDuration) {
> return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>   }
>   public UserSessions withMaxSize(Duration maxSize) {
> this.maxSize = maxSize;
> return this;
>   }
>   @Override
>   public Collection assignWindows(AssignContext 
> assignContext) throws Exception {
> return Arrays.asList(new IntervalWindow(assignContext.timestamp(), 
> gapDuration));
>   }
>   private Duration windowSize(IntervalWindow window) {
> return window == null
> ? new Duration(0)
> : new Duration(window.start(), window.end());
>   }
>   @Override
>   public void mergeWindows(MergeContext mergeContext) throws Exception {
> List sortedWindows = new ArrayList<>();
> for (IntervalWindow window : mergeContext.windows()) {
>   sortedWindows.add(window);
> }
> Collections.sort(sortedWindows);
> List merges = new ArrayList<>();
> MergeCandidate current = new MergeCandidate();
> for (IntervalWindow window : sortedWindows) {
>   MergeCandidate next = new MergeCandidate(window);
>   if (current.intersects(window)) {
> current.add(window);
> Duration currentWindow = windowSize(current.union);
> if (currentWindow.isShorterThan(maxSize) || 
> currentWindow.isEqual(maxSize) || current.size() < 10)
>   continue;
> // Current window exceeds bounds, so flush and move to next
> LOG.info("** EXCEEDS 10 Events CRITERIA."); // this never 
> hits.
> next = new MergeCandidate();
>   }
>   merges.add(current);
>   current = next;
> }
> merges.add(current);
> for (MergeCandidate merge : merges) {
>   merge.apply(mergeContext);
> }
>   }
>   @Override
>   public WindowMappingFn getDefaultWindowMappingFn() {
> throw new UnsupportedOperationException("Sessions is not allowed in side 
> inputs");
>   }
>   @Override
>   public boolean isCompatible(WindowFn other) {
> return false;
>   }
>   @Override
>   public Coder windowCoder() {
> return IntervalWindow.getCoder();
>   }
>   private static class MergeCandidate {
> @Nullable
> private IntervalWindow union;
> private final List parts;
> public MergeCandidate() {
>   union = null;
>   parts = new ArrayList<>();
> }
> public MergeCandidate(IntervalWindow window) {
>   union = window;
>   parts = new ArrayList<>(Arrays.asList(window));
> }
> public boolean intersects(IntervalWindow window) {
>   return union == null || union.intersects(window);
> }
> public void add(IntervalWindow window) {
>   union = union == null ? window : union.span(window);
>   union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>   parts.add(window);
> }
> public void apply(WindowFn.MergeContext c) throws 
> Exception {
>   if (this.parts.size() > 1) {
> c.merge(parts, union);
>   }
> }
> public int size() {
>   return this.parts.size();
> }
> @Override
> public String toString() {
>   return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
> }
>   }
> }
>
> Thanks & Regards,
>
> *Jainik Vora*
>
>


Re: Stability of Timer.withOutputTimestamp

2020-02-05 Thread Kenneth Knowles
It is definitely too new to be stable in the sense of not even tiny changes
to the API / runtime compatibility.

However, in my opinion it is so fundamental (and overdue) it will certainly
exist in some form.

Feel free to use it if you are OK with the possibility of minor
compile-time adjustments and you do not require Dataflow pipeline update
compatibility.

Kenn

On Wed, Feb 5, 2020 at 10:31 AM Luke Cwik  wrote:

> +Reuven Lax 
>
> On Wed, Feb 5, 2020 at 7:33 AM Steve Niemitz  wrote:
>
>> Also, as a follow up, I'm curious about this commit:
>>
>> https://github.com/apache/beam/commit/80862f2de6f224c3a1e7885d197d1ca952ec07e3
>>
>> My use case is that I want to set a timer to fire after the max timestamp
>> of a window, but hold the watermark to the max timestamp until it fires,
>> essentially delaying the window closing by some amount of event time.
>> Previous to that revert commit it seems like that would have been possible,
>> but now it would fail (since the target is after the window's maxTimestamp).
>>
>> What was the reason this was reverted, and are there plans to un-revert
>> it?
>>
>> On Wed, Feb 5, 2020 at 10:01 AM Steve Niemitz 
>> wrote:
>>
>>> I noticed that Timer.withOutputTimestamp has landed in 2.19, but I
>>> didn't see any mention of it in the release notes.
>>>
>>> Is this feature considered stable (specifically on dataflow)?
>>>
>>


Re: BigQuery TIMESTAMP and TimestampedValue()

2020-01-22 Thread Kenneth Knowles
Ah, that's too bad. I wonder why they chose to put " UTC" on the end
instead of just a "Z". Other than that, the format is RFC3339 and the
iso8601 module does have the extension to use a space instead of a T to
separate the date and time. I tested and if you strip the " UTC" then
parsing succeeds.

Since BigQuery TIMESTAMPS do not carry time zone information, it is safe to
ignore the time zone portion. The problem of course is if they change/fix
this it could break your code.

Kenn

On Mon, Jan 20, 2020 at 2:45 PM Sandy Walsh  wrote:

> [image: :wave:] Newb here for what will certainly be the first of many
> silly questions ...
>
> I'm working on a dataflow pipeline using python SDK (local runners
> currently).
>
> It's a bounded source from BigQuery. One column is a TIMESTAMP. I'm trying
> to assign the timestamp using beam.window.TimestampedValue() but the
> timestamp I'm getting back from BQ seems to be a string and not in RFC3339
> format.
>
> The format is '2019-12-13 09:38:19.380224 UTC' ... which I could
> explicitly convert but I'd rather do that in the query.
>
> Any suggestions on how to get the timestamp back in format I can parse
> with iso8601.parse_date() or, ideally, just pass into TimestampedValue()
> without having to parse a string?
>
> Thanks
>
>


Re: Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Kenneth Knowles
On Fri, Jan 17, 2020 at 11:44 AM Stephen Patel 
wrote:

> * What is the space of keys? Is it constantly growing?
>
> The keys are effectively database row ids so they're always growing,
> or being updated if the rows are modified.  Each row is small.
>
> * Are you explicitly clearing out state from stale keys?
>
> No, once a key is entered, I keep the value for it until it's updated
> with a new value.  This is so that if the value changes, or we get an
> out of order version of that value, we can issue the appropriate
> changes/retractions.
>

In this case your state will continue to grow without bound. I don't know
about Flink's details. If checkpoints are incremental then maybe this is no
problem? But maybe it is.

Kenn


> I can share a small variant of the code that doesn't involve any
> statefulness (apart from the implicit and temporary statefulness of
> the cogroup/deduplication operator):
> https://gist.github.com/spatel11/ced0d175ca64962e0ec734a857d1ef33
>
> This code can create two types of pipelines.  One fills two sqs queues
> at different rates, the other reads from those queues, cogroups them,
> and generates some metrics.  The read pipeline will operate fine for.
>
> I've also included the flink-conf that I was using.
>
> Stephen
>
> On Fri, Jan 17, 2020 at 11:47 AM Kenneth Knowles  wrote:
> >
> > Starting with just checking some basic things: What is the space of
> keys? Is it constantly growing? Are you explicitly clearing out state from
> stale keys? In the global window, you don't get any state GC for free.
> >
> > Can you share repro code?
> >
> > Kenn
> >
> > On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel 
> wrote:
> >>
> >> I've got a beam pipeline using the FlinkRunner that reads from two
> >> different SQS sources (using the SqsIO).  It does some stateful
> >> processing on each stream, and then cogroups the results together to
> >> generate a result and write it to Sns (using the SnsIO).  The volume
> >> of input data isn't particularly large (about 50 messages per minute
> >> on one queue, and about 1 message per minute on the other queue).
> >> It's using the Global window, discarding fired panes, with a
> >> processing time trigger delayed by 1 minute.  Checkpointing is enabled
> >> at a 1 minute interval, with a minimum delay between checkpoints of 30
> >> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
> >> predefined options.
> >>
> >> This pipeline runs fine for a few hours with an average checkpoint
> >> duration of 1s (with occasional spikes higher), but eventually the
> >> time it takes to checkpoint begins to grow until it's in the minutes
> >> on average, and finally it won't even complete within a 10 minute
> >> period.  I'm using 2 parallelism, and it seems to keep up with the
> >> number of incoming messages just fine (until the checkpoint duration
> >> grows too large and it is unable to delete the messages any longer).
> >> To try to isolate the problem, I wrote an alternate sqs reader that
> >> uses the Watch transform to periodically read from SQS.  This variant
> >> doesn't show the same behavior, and has been running for a week
> >> without issue (an average checkpoint time of 1-2s).
> >>
> >> Some other experiments I tried:
> >>
> >> * I observed that the operators that took a long time to checkpoint
> >> were the deduplicating operators after the actual unbounded source
> >> operator.  I disabled requiresDeduping and added a Reshuffle instead,
> >> however that exhibited the same growth in checkpoint durations after a
> >> period of time.
> >> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
> >> once, however that also exhibited the same behavior.
> >>
> >>
> >> Does anyone have any thoughts about what might cause this behavior
> >> with the Unbounded Source (as opposed to the splittable do variant)?
> >>
> >> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
> >>
> >> Stephen
>


Re: Escalating Checkpoint durations in Flink with SQS Source

2020-01-17 Thread Kenneth Knowles
Starting with just checking some basic things: What is the space of keys?
Is it constantly growing? Are you explicitly clearing out state from stale
keys? In the global window, you don't get any state GC for free.

Can you share repro code?

Kenn

On Fri, Jan 17, 2020 at 8:53 AM Stephen Patel 
wrote:

> I've got a beam pipeline using the FlinkRunner that reads from two
> different SQS sources (using the SqsIO).  It does some stateful
> processing on each stream, and then cogroups the results together to
> generate a result and write it to Sns (using the SnsIO).  The volume
> of input data isn't particularly large (about 50 messages per minute
> on one queue, and about 1 message per minute on the other queue).
> It's using the Global window, discarding fired panes, with a
> processing time trigger delayed by 1 minute.  Checkpointing is enabled
> at a 1 minute interval, with a minimum delay between checkpoints of 30
> seconds.  My state backend is RocksDB, using the FLASH_SSD_OPTIMIZED
> predefined options.
>
> This pipeline runs fine for a few hours with an average checkpoint
> duration of 1s (with occasional spikes higher), but eventually the
> time it takes to checkpoint begins to grow until it's in the minutes
> on average, and finally it won't even complete within a 10 minute
> period.  I'm using 2 parallelism, and it seems to keep up with the
> number of incoming messages just fine (until the checkpoint duration
> grows too large and it is unable to delete the messages any longer).
> To try to isolate the problem, I wrote an alternate sqs reader that
> uses the Watch transform to periodically read from SQS.  This variant
> doesn't show the same behavior, and has been running for a week
> without issue (an average checkpoint time of 1-2s).
>
> Some other experiments I tried:
>
> * I observed that the operators that took a long time to checkpoint
> were the deduplicating operators after the actual unbounded source
> operator.  I disabled requiresDeduping and added a Reshuffle instead,
> however that exhibited the same growth in checkpoint durations after a
> period of time.
> * I tried with the AT_LEAST_ONCE checkpointing mode instead of exactly
> once, however that also exhibited the same behavior.
>
>
> Does anyone have any thoughts about what might cause this behavior
> with the Unbounded Source (as opposed to the splittable do variant)?
>
> I'm running on EMR emr-5.26.0, using Flink 1.8.0, and Beam 2.14.0.
>
> Stephen
>


Re: Question about triggering

2020-01-13 Thread Kenneth Knowles
Here is the Count.combineFn:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L133

Note that the type T is completely abstract and the function never touches
the input element.

Are you able to share the code of the CombineFn that you implemented with
logging?

Kenn

On Fri, Jan 10, 2020 at 4:00 AM Andrés Garagiola 
wrote:

> Hi Kenneth
>
> Thanks for your reply. I realized that the issue happens if I perform the
> Count over the PCollection, but it works well if I map the
> collection to a primitive type, for example, PCollection.
>
> I don't think that it is a trigger issue, but something with my type.
> Something I tried it is implementing my own CombineFn and logging the rows.
> Only the first element is processed by the function when I use MyClass.
>
> I didn't tried with the direct runner I will tried that. The size of the
> collections is about 600 records.
>
> Thanks
> Regards
>
> On Thu, Jan 9, 2020 at 11:56 PM Kenneth Knowles  wrote:
>
>> Does it have the same behavior in the direct runner? What are the sizes
>> of intermediate PCollections?
>>
>> Kenn
>>
>> On Wed, Jan 8, 2020 at 1:05 PM Andrés Garagiola <
>> andresgaragi...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm doing some tests with beam and apache flink. I'm running the code
>>> below:
>>>
>>>   public static void main(String[] args) throws IOException {
>>> WorkflowStepOptions options =
>>> PipelineOptionsFactory.fromArgs(args).withValidation()
>>> .as(WorkflowStepOptions.class);
>>> logger.info("Options Kafka server {} input topic {} output topic {}
>>> window size {} group id {} step name {}",
>>> options.getKafkaBrokers(), options.getTopics(),
>>> options.getOutputTopic(), options.getWindowSize(),
>>> options.getGroupId(), workflowStepName);
>>> Pipeline p = Pipeline.create(options);
>>>
>>> CoderRegistry cr = p.getCoderRegistry();
>>> cr.registerCoderForClass(MyClass.class, new MyClassCoder());
>>>
>>> KafkaIO.Read kafkaIOReader =
>>> KafkaIO.read()
>>> .withBootstrapServers(options.getKafkaBrokers())
>>> .withTopics(Arrays.asList(options.getTopics().split(",")))
>>> .withKeyDeserializer(IntegerDeserializer.class)
>>> .withValueDeserializer(MyClassEventDeserializer.class)
>>> //.withTimestampPolicyFactory(new
>>> MyClassTimestampPolicyFactory())
>>> .withTimestampFn((KV event) ->
>>> event.getValue().getDate() == null ?
>>> Instant.now() :
>>> Instant.parse(event.getValue().getDate(),
>>>
>>> DateTimeFormat.forPattern("-MM-dd'T'HH:mm:ssZ")))
>>> .withConsumerConfigUpdates(
>>> ImmutableMap.of(
>>> "group.id", options.getGroupId(),
>>> "auto.offset.reset", "earliest")
>>> );
>>>
>>> KafkaIO.Write kafkaOutput = KafkaIO.>> String>write()
>>> .withBootstrapServers(options.getKafkaBrokers())
>>> .withTopic(options.getOutputTopic())
>>> .withKeySerializer(StringSerializer.class)
>>> .withValueSerializer(StringSerializer.class);
>>>
>>> Window> window = Window
>>> .>> MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize(
>>> .accumulatingFiredPanes()
>>> .withAllowedLateness(Duration.standardDays(365L))
>>> .triggering(AfterWatermark.pastEndOfWindow()
>>> .withEarlyFirings(
>>> AfterProcessingTime
>>> .pastFirstElementInPane()
>>>
>>> .plusDelayOf(Duration.standardSeconds(1L)))
>>> .withLateFirings(
>>> AfterPane
>>> .elementCountAtLeast(1))
>>> );
>>>
>>> PCollection toFormat = p.apply(kafkaIOReader.withoutMetadata())
>>> .apply("Window", window)
>>> .apply(Combine.globally(Count.>> MyClass>>combineFn()).withoutDefaults());
>>>
>>> toFormat

Re: Question about triggering

2020-01-09 Thread Kenneth Knowles
Does it have the same behavior in the direct runner? What are the sizes of
intermediate PCollections?

Kenn

On Wed, Jan 8, 2020 at 1:05 PM Andrés Garagiola 
wrote:

> Hi all,
>
> I'm doing some tests with beam and apache flink. I'm running the code
> below:
>
>   public static void main(String[] args) throws IOException {
> WorkflowStepOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation()
> .as(WorkflowStepOptions.class);
> logger.info("Options Kafka server {} input topic {} output topic {}
> window size {} group id {} step name {}",
> options.getKafkaBrokers(), options.getTopics(),
> options.getOutputTopic(), options.getWindowSize(),
> options.getGroupId(), workflowStepName);
> Pipeline p = Pipeline.create(options);
>
> CoderRegistry cr = p.getCoderRegistry();
> cr.registerCoderForClass(MyClass.class, new MyClassCoder());
>
> KafkaIO.Read kafkaIOReader =
> KafkaIO.read()
> .withBootstrapServers(options.getKafkaBrokers())
> .withTopics(Arrays.asList(options.getTopics().split(",")))
> .withKeyDeserializer(IntegerDeserializer.class)
> .withValueDeserializer(MyClassEventDeserializer.class)
> //.withTimestampPolicyFactory(new
> MyClassTimestampPolicyFactory())
> .withTimestampFn((KV event) ->
> event.getValue().getDate() == null ?
> Instant.now() :
> Instant.parse(event.getValue().getDate(),
>
> DateTimeFormat.forPattern("-MM-dd'T'HH:mm:ssZ")))
> .withConsumerConfigUpdates(
> ImmutableMap.of(
> "group.id", options.getGroupId(),
> "auto.offset.reset", "earliest")
> );
>
> KafkaIO.Write kafkaOutput = KafkaIO. String>write()
> .withBootstrapServers(options.getKafkaBrokers())
> .withTopic(options.getOutputTopic())
> .withKeySerializer(StringSerializer.class)
> .withValueSerializer(StringSerializer.class);
>
> Window> window = Window
> . MyClass>>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize(
> .accumulatingFiredPanes()
> .withAllowedLateness(Duration.standardDays(365L))
> .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(
> AfterProcessingTime
> .pastFirstElementInPane()
>
> .plusDelayOf(Duration.standardSeconds(1L)))
> .withLateFirings(
> AfterPane
> .elementCountAtLeast(1))
> );
>
> PCollection toFormat = p.apply(kafkaIOReader.withoutMetadata())
> .apply("Window", window)
> .apply(Combine.globally(Count. MyClass>>combineFn()).withoutDefaults());
>
> toFormat
> .apply("FormatResults",
> MapElements
>
> .into(TypeDescriptors.kvs(TypeDescriptors.strings(),TypeDescriptors.strings()))
> .via((Long count) ->
> {
>   return KV.of("count", count.toString());
> })
> )
> .apply(kafkaOutput);
>
> p.run();
>   }
>
> The idea is very simple, read some events from a Kafka topic, group them
> into a window, count them and put the result in another Kafka topic.
>
> I'm a little confuse regarding the result, the code above only produces
> one entry counting "1" element while I have a lot (around 500) events in
> the source topic.
>
> Do you have some suggestion to figure out the solution? Something I'm
> doing wrong here.
>
> Regards
>


Re: Scio 0.8.0 released

2020-01-09 Thread Kenneth Knowles
So fast! Excellent.

On Wed, Jan 8, 2020 at 11:28 AM Robert Bradshaw  wrote:

> Nice!
>
> On Wed, Jan 8, 2020 at 10:03 AM Neville Li  wrote:
>
>> Hi all,
>>
>> We just released Scio 0.8.0. This is based on the most recent Beam 2.17.0
>> release and includes a lot of new features & bug fixes over the past 10
>> months.
>>
>> Cheers,
>> Neville
>>
>> https://github.com/spotify/scio/releases/tag/v0.8.0
>>
>> *"Amato Animo Animato Animagus"*
>> Breaking changes & deprecations
>>
>>- See v0.8.0 Migration Guide
>> 
>> for
>>detailed instructions
>>- Remove @experimental from transform #2537
>>
>>- Deprecate scio-elasticsearch2 and scio-cassandra2 #2414
>> #2421
>>
>>- Deprecate hashFilter #2442
>>
>>- Deprecate legacy components in scio-extras #2533
>>
>>
>> Features
>>
>>- Bump Beam to 2.17.0 #2577
>>
>>- Add sharded Sparkey support. #2336
>>
>>- Rework side input cache #2363
>>
>>- Cleanup Side Inputs API, introduce Singleton Set SideInputs #2424
>>
>>- Add schema support for GenericRecord #2514
>>
>>
>> Bug fixes & improvements
>>
>>- Add file:linenum only to outer transform #2405
>>
>>- Fix join transform names #2444
>>
>>- Remove Coder context bound for partitionByKey #2451
>>
>>- Rename that method argument in join functions to rhs #2466
>>
>>- Replace custom ClosureCleaner with chill's #2423Use chill's
>>Externalizer to serialize predicates in SCollectionMatchers #2410
>>
>>- Add errmsg when beamOpts == null in JobTest, fix #2430
>> #2545
>>
>>- Add bigQuerySelect() method with default flattenResults value #2500
>>
>>- Better consistency around BigQuery API #2412Fail early on malformed
>>BigQuery spec #2345 
>>- Rewrite typedBigQueryStorage #2434
>>
>>- Add DML query support to bigquery client #2418
>>
>>- Treat Avro array as java List in BigQuery read, fix #2068
>> #2415
>>
>>- Fix NPE in scio-bigtable's ChannelPoolCreator when credentials
>>aren't set #2317Fix bigtable scollection ops return type #2486
>>
>>- Refactor PubsubIO for more typesafety #2457
>>
>>- Avoid Mutation coder fallback for Spanner #2478
>>
>>- Fix Parquet sink suffix #2367Improve iterable equality #2483
>>
>>- Improve back compat with Scio 0.7 #2401
>>
>>- Improve coder gen by checking companion implicits #2522
>>
>>- Make recursive coders serializable #2404
>>
>>- Remove kryo coder override in intermediate steps #2422Fix fallback
>>warning when implicit is in scope #2511
>>
>>- Improve the schema compatibility error message #2366
>>
>>- Remove schema fallback #2489
>>
>>- Add Schemas support for more types #2364
>>
>>- Assert FileStorage.isDone in MaterializeTap #2518
>>
>>- Add support for cleaning up TF models on shutdown #2549
>>
>>- Rework TensorFlow predict ops #2343
>>
>>- Remove unused/deprecated TensorFlow graph DoFn #2339
>>
>>- Mark some APIs in scio-extras as experimental #2517
>> #2572
>>
>>
>>
>>
>


Re: Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Kenneth Knowles
+Lukasz Cwik  +Chamikara Jayalath 

It sounds like your high-fanout transform that listens for new files on
Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
fairly common use case that could be a useful general contribution to Beam.

Kenn

On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas  wrote:

> We also had throughput issues in writing to BQ in a streaming pipeline and
> we mitigated by provisioning a large quantity of SSD storage to improve I/O
> throughput to disk for checkpoints.
>
> I also Erik's suggestion to look into Streaming Engine. We are currently
> looking into migrating our streaming use cases to use streaming engine
> after we had success with improved BQ write throughput on batch workloads
> by using Shuffle service (the batch mode analogue to the Streaming Engine).
>
>
>
> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln 
> wrote:
>
>> Hi Frantisek,
>>
>>
>>
>> Some advice from making a similar pipeline and struggling with throughput
>> and latency:
>>
>>1. Break up your pipeline into multiple pipelines. Dataflow only
>>auto-scales based on input throughput. If you’re microbatching events in
>>files, the job will only autoscale to meet the volume of files, not the
>>volume of events added to the pipeline from the files.
>>   1. Better flow is:
>>
>>i.  
>> Pipeline
>> 1: Receive GCS notifications, read files, and then output file contents as
>> Pubsub messages either per event or in microbatches from the file
>>
>>  ii.  
>> Pipeline
>> 2: Receive events from Pubsub, do your transforms, then write to BQ
>>
>>1. Use the Streaming Engine service (if you can run in a region
>>supporting it):
>>
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>2. BQ streaming can be slower than a load job if you have a very high
>>volume (millions of events a minute). If your event volume is high, you
>>might need to consider further microbatching loads into BQ from GCS.
>>
>>
>>
>> Hope this helps,
>>
>> Erik
>>
>>
>>
>> *From:* Frantisek Csajka 
>> *Sent:* Friday, November 22, 2019 5:35 AM
>> *To:* user@beam.apache.org
>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>
>>
>>
>> Hi Beamers,
>>
>> We are facing OutOfMemory errors with a streaming pipeline on Dataflow.
>> We are unable to get rid of them, not even with bigger worker instances.
>> Any advice will be appreciated.
>>
>> The scenario is the following.
>> - Files are being written to a bucket in GCS. A notification is set up on
>> the bucket, so for each file there is a Pub/Sub message
>> - Notifications are consumed by our pipeline
>> - Files from notifications are read by the pipeline
>> - Each file contains several events (there is a quite big fanout)
>> - Events are written to BigQuery (streaming)
>>
>> Everything works fine if there are only few notifications, but if the
>> files are incoming at high rate or if there is a large backlog in the
>> notification subscription, events get stuck in BQ write and later OOM is
>> thrown.
>>
>> Having a larger worker does not work because if the backlog is large,
>> larger instance(s) will throw OOM as well, just later...
>>
>> As far as we can see, there is a checkpointing/reshuffle in BQ write and
>> thats where the elements got stuck. It looks like the pubsub is consuming
>> too many elements and due to fanout it causes OOM when grouping in
>> reshuffle.
>> Is there any way to backpressure the pubsub read? Is it possible to have
>> smaller window in Reshuffle? How does the Reshuffle actually work?
>> Any advice?
>>
>> Thanks in advance,
>> Frantisek
>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are
>> intended solely for the use of the individual or entity to whom they are
>> addressed and may contain confidential and privileged information protected
>> by law. If you received this e-mail in error, any review, use,
>> dissemination, distribution, or copying of the e-mail is strictly
>> prohibited. Please notify the sender immediately by return e-mail and
>> delete all copies from your system.
>>
>


Re: slides?

2019-11-15 Thread Kenneth Knowles
We have a section for this:
https://beam.apache.org/community/presentation-materials/.

Right now "Presentation Materials" has the appearance of carefully curated
stuff from a core team. That was probably true three years ago, but now it
is simply out of date. A lot of the material is so old that it is actually
incorrect. It would be good to invite more people to maintain this.

Perhaps:

 - "Presentation Materials" -> "Presentations"
 - Replace the Google Drive link with readable HTML page with a (possibly
large) archive of events and slides, with licenses so new folks can
copy/paste/modify to kick start their talk

This has the added benefit that there is a clear date and author on each
piece, so you know how old the material is and can put it in context. And
link to video of people presenting with the deck, too. That can be better
than long written speaker notes.

Kenn

On Thu, Nov 14, 2019 at 10:00 PM Austin Bennett 
wrote:

> Hi Dev and User,
>
> Wondering if people would find a benefit from collecting slides from
> Meetups/Talks?
>
> Seems that this could be appropriate on the website, for instance.  Not
> sure whether this has been asked previously, so bringing it to the group.
>
> Cheers,
> Austin
>


Re: Multiple triggers contained w/in side input?

2019-11-06 Thread Kenneth Knowles
Good questions,

On Wed, Nov 6, 2019 at 12:59 AM rahul patwari 
wrote:

> Hi Kenn,
>
> Does the side input has elements from the previous trigger even when used
> with .discardingFiredPanes() like
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-global-window-side-inputs
>
>

Yes, the elements from the previous trigger firing will be there. The
elements with be different. Suppose:

 - you are doing a Sum and the inputs are 1, 2, 3, 4
 - you trigger after 1, 2 and then trigger again after 3, 4

There will always be two elements in the output, and two elements go into
the side input. The elements will be:

 - discardingFiredPanes: 3, 7
 - accumulatingFiredPanes: 3, 10

Does View.asSingleton() affect this behaviour?
>

View.asSingleton() will crash if you have multiple triggers on a
Combine.globally() or Sum.globally(), etc. Just like how View.asMap() will
crash if you have multiple triggers on a per-key Combine/Sum/etc.


> Thanks,
> Rahul
>
> On Wed, Nov 6, 2019 at 11:24 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon  wrote:
>>
>>> From
>>> https://beam.apache.org/documentation/programming-guide/#side-inputs
>>>
>>> > If the side input has multiple trigger firings, Beam uses the value
>>> from the latest trigger firing. This is particularly useful if you use a
>>> side input with a single global window and specify a trigger.
>>>
>>
>> Sorry for this. The documentation is entirely wrong. If a side input has
>> multiple firings, then all elements from all firings are included in the
>> side input as though they are unrelated elements. I have filed
>> https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
>> confusing result and give a good error message.
>>
>> I have this sub-pipeline:
>>>
>>> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>>>
>>
>> Are you sure you want this? It will drop all data after the first firing.
>> We are about to disable such triggers due to the data loss risk. See
>> https://s.apache.org/finishing-triggers-drop-data. If your intent is to
>> drop all subsequent data, I am interested in your use case. Can you share
>> more?
>>
>>
>>> -> Combine.perKey (Max)
>>> -> View.asMap
>>> ...which I use as a side input.
>>>
>>> But I get a "Duplicate values for " error (DirectRunner). (Stack
>>> trace below.)
>>>
>>> But the only way for duplicate keys to come out of the global window is
>>> via multiple triggers.
>>>
>>> What am I missing?
>>>
>>
>> This is surprising. Can you share the actual code of your pipeline?
>> According to your pseudocode, this is impossible. The trigger you described
>> should never fire multiple times. But as I mentioned above, the trigger is
>> about to be forbidden. If we can learn about your usage, maybe that will
>> help.
>>
>> Kenn
>>
>>
>>>
>>>
>>>
>>> ===
>>> java.lang.IllegalArgumentException: Duplicate values for :ihop
>>> at
>>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
>>> at
>>> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
>>> at
>>> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
>>> at
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>>>
>>>
>>>


Re: Multiple triggers contained w/in side input?

2019-11-05 Thread Kenneth Knowles
On Tue, Nov 5, 2019 at 9:29 PM Aaron Dixon  wrote:

> From https://beam.apache.org/documentation/programming-guide/#side-inputs
>
> > If the side input has multiple trigger firings, Beam uses the value
> from the latest trigger firing. This is particularly useful if you use a
> side input with a single global window and specify a trigger.
>

Sorry for this. The documentation is entirely wrong. If a side input has
multiple firings, then all elements from all firings are included in the
side input as though they are unrelated elements. I have filed
https://issues.apache.org/jira/browse/BEAM-8563 to at least prevent the
confusing result and give a good error message.

I have this sub-pipeline:
>
> -> GlobalWindow (triggering AfterProcessingTime.pastFirstElementInPane)
>

Are you sure you want this? It will drop all data after the first firing.
We are about to disable such triggers due to the data loss risk. See
https://s.apache.org/finishing-triggers-drop-data. If your intent is to
drop all subsequent data, I am interested in your use case. Can you share
more?


> -> Combine.perKey (Max)
> -> View.asMap
> ...which I use as a side input.
>
> But I get a "Duplicate values for " error (DirectRunner). (Stack
> trace below.)
>
> But the only way for duplicate keys to come out of the global window is
> via multiple triggers.
>
> What am I missing?
>

This is surprising. Can you share the actual code of your pipeline?
According to your pseudocode, this is impossible. The trigger you described
should never fire multiple times. But as I mentioned above, the trigger is
about to be forbidden. If we can learn about your usage, maybe that will
help.

Kenn


>
>
>
> ===
> java.lang.IllegalArgumentException: Duplicate values for :ihop
> at
> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:397)
> at
> org.apache.beam.sdk.values.PCollectionViews$MapViewFn.apply(PCollectionViews.java:373)
> at
> org.apache.beam.runners.direct.SideInputContainer$SideInputContainerSideInputReader.get(SideInputContainer.java:261)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:247)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:74)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:548)
>
>
>


Re: Streaming data from Pubsub to Spanner with Beam dataflow pipeline

2019-10-30 Thread Kenneth Knowles
Moving to user@beam.apache.org, the best mailing list for questions like
this.

Yes, this kind of workload is a core use case for Beam. If you have a
problem, please write to this user list with details.

Kenn

On Wed, Oct 30, 2019 at 4:07 AM Taher Koitawala  wrote:

> Hi All,
>   My current use-case is to write data from Pubsub to Spanner
> using a streaming pipeline. I do see that Beam does have a SpannerIO to
> write.
>
>   However, pubsub being streaming and Spanner being RDBMS like, it
> would be helpful to you guys can tell me if this will be performant enough
> or not. If someone has already tried this out and can give me a few
> caveats, then that would be really awesome.
>
>
> Regards,
> Taher Koitawala
>


Re: Joining PCollections to aggregates of themselves

2019-10-11 Thread Kenneth Knowles
This seems a great example of use of stateful DoFn. It has essentially the
same structure as the example on the Beam blog but is more meaningful.

Kenn

On Fri, Oct 11, 2019 at 12:38 PM Robert Bradshaw 
wrote:

> OK, the only way to do this would be via a non-determanistic stateful
> DoFn that buffers elements as they come in and computes averages by
> looking at the buffer each time.
>
> This could also be represented with an extension to window merging and
> a join, where the trigger would be explicitly used to control the
> balance between latency and correctness.
>
> On Fri, Oct 11, 2019 at 8:01 AM Sam Stephens 
> wrote:
> >
> > On 2019/10/10 18:23:46, Eugene Kirpichov  wrote:
> > > " input elements can pass through the Joiner DoFn before the sideInput
> > > corresponding to that element is present"
> > >
> > > I don't think this is correct. Runners will evaluate a DoFn with side
> > > inputs on elements in a given window only after all side inputs are
> ready
> > > (have triggered at least once) in this window, so your code should be
> safe.
> > > However, runners will not rerun the DoFn with side inputs on subsequent
> > > triggerings of the side inputs, so you won't be able to update the
> results.
> >
> > Yes, but the second or third time an element falling into a given window
> is processed by the Joiner DoFn the side input may not be up-to-date with
> these new elements, so the side-input having triggered at least once is not
> a guarantee it is up to date.
> >
> > On 2019/10/10 18:35:21, Robert Bradshaw  wrote:
> >
> > > Time: 00:08:00
> > > Input: 
> > Output: 
> >
> > >
> > > Time: 00:13:00
> > > Input: 
> >
> > Output:  // average 4 & 6
> >
> > >
> > > Time: 00:00:00
> > > Input: 
> >
> > Output:  // average 1
> > >
> > > Time: 00:02:00
> > > Input: 
> >
> > Output:  // average 1 & 2
> >
> > I'd say the least surprising result here is that the aggregate includes
> the best available information at the time of processing. So yes it is
> sensitive to the order of arrival, that's unavoidable I think.
> >
> > >
> > > Are you really trying to emit elements with the mean of all elements
> > > with timestamp up to 10 minutes prior to the current value? That's a
> > > bit different than sliding windows. In that a case you could do
> > > something with a Stateful DoFn that buffers elements and for each
> > > incoming element sets a timer at T which then reads the buffer,
> > > computes the output, and discards elements older than 10 minutes. You
> > > could also possibly do this with a custom WindowFn.
> > >
> >
> > Yes the requirement is basically to enrich an event stream with values
> computed over arbitrary other event streams (including the event stream
> being enriched) and to do this with as low latency as possible.
> >
> > Of course the values derived from other event streams might not be
> included even if they occur before the event being enriched (even if
> "before" is in both the event time and processing time sense). But this is
> easier to swallow because theres no obvious causal dependency between that
> aggregate value and the event being enriched.
> >
> > .. I hope that made sense
>


Re: Limited join with stop condition

2019-10-10 Thread Kenneth Knowles
Interesting! I agree with Luke that it seems not a great fit for Beam in
the most rigorous sense. There are many considerations:

1. We assume ParDo has side effects by default. So the model actual
*requires* eager evaluation, not lazy, in order to make all the side
effects happen. But for your case let us assume somehow we know it is
all @Pure.
2. Lazy evaluation and parallelism are in opposition. In pure computations
like Haskell, literally everything (except monadic sequence) is parallel
for free, but the problem is nothing starts until it is needed so
parallelism requires forcing computations early.

On the other hand, we can think about ways forward here. A first step is if
the join is a "side lookup join" where we always process all of source 1
but try to process less of source 2. If source 2 is feeding into a map side
input then this could be lazy in some way. When an element from source 1
calls the side input lookup it could be a blocking call that triggers reads
from source 2 until a match is found. This computation strategy is
consistent with the model and will read all of source 1 but only the prefix
of source 2 needed to join all of source 1. I think you could implement
this pattern with parallelism on both the main input and side input. Then,
to read less of source 1 you need feedback from the sink to the source. We
have nothing like that... This is all very abstract hypotheticals.

If we get to practical implementation "today" then every runner pretty much
reads all of a bounded source before even starting the next transform, no?.
I wonder if it makes sense to convert them to unbounded (which is still
allowed to terminate but does not support dynamic splits). Then you just
terminate the pipeline when you have enough output. You will read more than
you need but maybe that is not so bad, and anyhow hard to avoid. Also a
vague idea...

And I have to ask, though, can you build indices instead of brute force for
the join?

Kenn

On Thu, Oct 10, 2019 at 10:47 AM Luke Cwik  wrote:

> This doesn't seem like a good fit for Apache Beam but have you tried:
> * using a StatefulDoFn that performs all the joining and signals the
> service powering the sources to stop sending data once your criteria is met
> (most services powering these sources won't have a way to be controlled
> this way)?
> * using a StatefulDoFn that performs all the joining and to write out the
> data to the output directly and then shutdown the pipeline (you can't have
> any transforms that are after the StatefulDoFn)?
>
> Both of these ideas remove a lot of the parallelism that Apache Beam
> provides.
>
>
>
> On Thu, Oct 10, 2019 at 10:36 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> Hello,
>>
>> We have a use case and it's not clear how it can be solved/implemented
>> with Beam. I count on community help with this, maybe I miss something that
>> lays on the surface.
>>
>> Let’s say, there are two different bounded sources and one join transform
>> (say GBK) downstream. This Join transform is like INNER JOIN which joins
>> elements of two collections only if they have common key (though, it could
>> be any other join logic there, doesn’t matter). What matters is that this
>> Join has to return only N records as output and then we have to stop
>> pipeline after they have been processed. It means that, in the best case,
>> we need to read only N records from every source, join them and move
>> downstream and after pipeline should be stopped. In other cases, if some
>> records don’t have common key in other collection, we need to read another
>> bunch of records and see if it would be enough to have N joined records
>> after Join.
>>
>> Below, here is a simple example of this. Say, every source contains 1M of
>> records but after Join we need to have only 1K of joined records. So, we
>> don’t want to read all two millions from 2 sources in case if we can have
>> an output after reading much less records in the end. So, 1K of joined
>> records is a stop condition.
>>
>> 1M
>> —
>> | Source 1 |
>> —  |  ———
>> |———> | Join  |———> Output 1K and stop
>> 1M   |  ———
>> —  |
>> | Source 2 |
>> —
>>
>> So, it looks like I need to have ability to read new portion of data "on
>> demand” or like to have a back pressure mechanizm which signals from
>> downstream to upstream that “please, give me only N elements and then wait
>> until I ask for more”. I’m not sure that Beam supports something like this.
>>
>> As an idea, I was trying to split initial inputs into fixed Windows with
>> trigger “AfterPane.elementCountAtLeast(N)” to read data by limited batches
>> and use another “AfterPane.elementCountAtLeast(N)” after Join which should
>> trigger only once. It doesn’t work and still, it won’t read data “on
>> demand” and stop the whole pipeline, I guess.
>>
>> Do you think it can be feasible to do in Beam?

Re: Beam discarding massive amount of events due to Window object or inner processing

2019-10-08 Thread Kenneth Knowles
This is an unfortunate usability problem with triggers where you can
accidentally close the window and drop all data. I think instead, you
probably want this trigger:

  Repeatedly.forever(
  AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))

The way I recommend to express this trigger is:

AfterWatermark.pastEndOfWindow().withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))

In the second case it is impossible to accidentally "close" the window and
drop all data.

Kenn

On Tue, Oct 8, 2019 at 7:43 AM Eddy G  wrote:

> Been recently developing a Beam (Dataflow) consumer which read from a
> PubSub subscription and outputs to Parquet files the combination of all
> those objects grouped within the same window.
>
> While I was doing testing of this without a huge load everything seemed to
> work fine.
>
> However, after performing some heavy testing I can see that from 1.000.000
> events sent to that PubSub queue, only 1000 make it to Parquet!
>
> According to multiple wall times across different stages, the one which
> parses the events prior applying the window seems to last 58 minutes. The
> last stage which writes to Parquet files lasts 1h and 32 minutes.
>
> I will show now the most relevant parts of the code within, hope you can
> shed some light if its due to the logic that comes before the Window object
> definition or if it's the Window object iself.
>
> pipeline
> .apply("Reading PubSub Events",
> PubsubIO.readMessagesWithAttributes()
> .fromSubscription(options.getSubscription()))
> .apply("Map to AvroSchemaRecord (GenericRecord)",
> ParDo.of(new PubsubMessageToGenericRecord()))
> .setCoder(AvroCoder.of(AVRO_SCHEMA))
> .apply("15m window",
>
> Window.into(FixedWindows.of(Duration.standardMinutes(15)))
> .triggering(AfterProcessingTime
> .pastFirstElementInPane()
> .plusDelayOf(Duration.standardSeconds(1)))
> .withAllowedLateness(Duration.ZERO)
> .accumulatingFiredPanes()
> )
>
> Also note that I'm running Beam 2.9.0.
>
> Tried moving the logic after the Window definition but still, most
> messages don't make it to the Parquet file.
>
> Could the logic inside the second stage be too heavy so that messages
> arrive too late and get discarded in the Window? The logic basically
> consists reading the payload, parsing into a POJO (reading inner Map
> attributes, filtering and such)
>
> However, if I sent a million events to PubSub, all those million events
> make it till the Parquet write to file stage, but when reading those
> parquets in Spark and checking the records they aren't complete. Does that
> make sense?
>
>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Kenneth Knowles
I am pretty surprised that we do not have a @Category(ValidatesRunner) test
in GroupByKeyTest that iterates multiple times. That is a major oversight.
We should have this test, and it can be disabled by the SparkRunner's
configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax  wrote:

> The Dataflow version does not spill to disk. However Spark's design might
> require spilling to disk if you want that to be implemented properly.
>
> On Fri, Sep 27, 2019 at 9:08 AM David Morávek  wrote:
>
>> Hi,
>>
>> Spark's GBK is currently implemented using `sortBy(key and
>> value).mapPartition(...)` for non-merging windowing in order to support
>> large keys and large scale shuffles. Merging windowing is implemented using
>> standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping),
>> which is by design unable to support large keys.
>>
>> As Jan noted, problem with mapPartition is, that its UDF receives an
>> Iterator. Only option here is to wrap this iterator to one that spills to
>> disk once an internal buffer is exceeded (the approach suggested by
>> Reuven). This unfortunately comes with a cost in some cases. The best
>> approach would be to somehow determine, that user wants multiple iterations
>> and than wrap it in "re-iterator" if necessary. Does anyone have any ideas
>> how to approach this?
>>
>> D.
>>
>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax  wrote:
>>
>>> The Beam API was written to support multiple iterations, and there are
>>> definitely transforms that do so. I believe that CoGroupByKey may do this
>>> as well with the resulting iterator.
>>>
>>> I know that the Dataflow runner is able to handles iterators larger than
>>> available memory by paging them in from shuffle, which still allows for
>>> reiterating. It sounds like Spark is less flexible here?
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský  wrote:
>>>
 +dev  

 Lukasz, why do you think that users expect to be able to iterate
 multiple times grouped elements? Besides that it obviously suggests the
 'Iterable'? The way that spark behaves is pretty much analogous to how
 MapReduce used to work - in certain cases it calles
 repartitionAndSortWithinPartitions and then does mapPartition, which
 accepts Iterator - that is because internally it merge sorts pre sorted
 segments. This approach enables to GroupByKey data sets that are too big to
 fit into memory (per key).

 If multiple iterations should be expected by users, we probably should:

  a) include that in @ValidatesRunner tests

  b) store values in memory on spark, which will break for certain
 pipelines

 Because of (b) I think that it would be much better to remove this
 "expectation" and clearly document that the Iterable is not supposed to be
 iterated multiple times.

 Jan
 On 9/27/19 9:27 AM, Jan Lukavský wrote:

 I pretty much think so, because that is how Spark works. The Iterable
 inside is really an Iterator, which cannot be iterated multiple times.

 Jan
 On 9/27/19 2:00 AM, Lukasz Cwik wrote:

 Jan, in Beam users expect to be able to iterate the GBK output multiple
 times even from within the same ParDo.
 Is this something that Beam on Spark Runner never supported?

 On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:

> Hi Gershi,
>
> could you please outline the pipeline you are trying to execute?
> Basically, you cannot iterate the Iterable multiple times in single ParDo.
> It should be possible, though, to apply multiple ParDos to output from
> GroupByKey.
>
> Jan
> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>
> Hi,
>
>
>
> I want to iterate multiple times on the Iterable (the output of
> GroupByKey transformation)
>
> When my Runner is SparkRunner, I get an exception:
>
>
>
> Caused by: java.lang.IllegalStateException: ValueIterator can't be
> iterated more than once,otherwise there could be data lost
>
> at
> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>
> at java.lang.Iterable.spliterator(Iterable.java:101)
>
>
>
>
>
> I understood I can branch the pipeline after GroupByKey into multiple
> transformation and iterate in each of them once on the Iterable.
>
>
>
> Is there a better way for that?
>
>
>
>
>
> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>
> Software Developer
>
> *T*: +972 (3) 7405718 <+972%203-740-5718>
>
> [image: Mail_signature_blue]
>
>
>
>


Re: Beam meetup Seattle!! September 26th, 6pm

2019-09-25 Thread Kenneth Knowles
Thanks for organizing. I'll be there!

Kenn

On Wed, Sep 25, 2019 at 2:50 PM Aizhamal Nurmamat kyzy 
wrote:

> Gentle reminder that Seattle Apache Beam meetup is happening tomorrow!
>
> Here is a quick agenda:
> - 18:00 - Registrations, speed networking, pizza and drinks.
> - 18:30 - kick-off
> - 18:40 - Making Beam Schemas Portable by Brian Hulette
> - 19:10 - Apache Beam @ Brightcove - A case study
> - 19:40 - ZetaSQL as a SQL dialect in BeamSQL by Rui Wang
> - 20:10 - Networking
>
> Looking forward to seeing you all tomorrow :)
>
>
> On Mon, Sep 23, 2019 at 11:03 AM Pablo Estrada  wrote:
>
>> Hello everyone!
>>
>> If you are in the Seattle area please come to Beam meetup this Thursday,
>> September 26th - at 6pm in the Google office in Fremont. There will be
>> interesting talks, and there should be a number of Beam contributors and
>> users around. Also pizza and drinks.
>>
>> The page with al the info:
>> https://www.meetup.com/Seattle-Apache-Beam-Meetup/events/263845364/
>>
>> I hope to see a lot of you around, and will be happy to chat about all
>> things Batch, Streaming, Beam, end of summer, etc.
>>
>> Best,
>> -P.
>>
>


Re: Hackathon @BeamSummit @ApacheCon

2019-08-22 Thread Kenneth Knowles
I will be at Beam Summit / ApacheCon NA and would love to drop by a
hackathon room if one is arranged. Really excited for both my first
ApacheCon and Beam Summit (finally!)

Kenn

On Thu, Aug 22, 2019 at 10:18 AM Austin Bennett 
wrote:

> And, for clarity, especially focused on Hackathon times on Monday and/or
> Tuesday of ApacheCon, to not conflict with BeamSummit sessions.
>
> On Thu, Aug 22, 2019 at 9:47 AM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> Less than 3 weeks till Beam Summit @ApacheCon!
>>
>> We are to be in Vegas for BeamSummit and ApacheCon in a few weeks.
>>
>> Likely to reserve space in the Hackathon Room to accomplish some tasks:
>> * Help Users
>> * Build Beam
>> * Collaborate with other projects
>> * etc
>>
>> If you're to be around (or not) let us know how you'd like to be
>> involved.  Also, please share and surface anything that would be good for
>> us to look at (and, esp. any beginner tasks, in case we can entice some new
>> contributors).
>>
>>
>> P.S.  See BeamSummit.org, if you're thinking of attending - there's a
>> discount code.
>>
>


Re: Query about JdbcIO.readRows()

2019-08-21 Thread Kenneth Knowles
Hi Kishor,

If you could not find a Jira, would you file one? Your contribution would
be very appreciated.

Kenn

On Tue, Aug 20, 2019 at 10:04 PM Kishor Joshi  wrote:

> Hi,
>
> This fix is still not available in the Beam 2.15.0. Is there any Jira that
> has been created for this issue ? I am interested to contribute in that.
>
> Thanks & Regards,
> Kishor
>
> On Friday, August 2, 2019, 10:19:17 PM GMT+5:30, Jean-Baptiste Onofré <
> j...@nanthrax.net> wrote:
>
>
> Agree. I will fix that.
>
> Regards
> JB
> Le 2 août 2019, à 17:15, Vishwas Bm  a écrit:
>
> Hi Kishor,
>
> + dev ( d...@beam.apache.org)
>
> This looks like a bug.  The attribute statementPreparator is nullable
> It should have been handled in the same way as in the expand method of
> Read class.
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Fri, Aug 2, 2019 at 2:48 PM Kishor Joshi < joshi...@yahoo.com> wrote:
>
> Hi,
>
> I am using the just released 2.14 version for JdbcIO with the newly added
> "readRows" functionality.
>
> I want to read table data with a query without parameters (select * from
> table_name).
> As per my understanding, this should not require "StatementPreperator".
> However, if I use the newly added "readRows" function, I get an exception
> that seems to force me to use the "StatementPreperator".
> Stacktrace below.
>
> java.lang.IllegalArgumentException: statementPreparator can not be null
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$Read.withStatementPreparator(JdbcIO.java:600)
>
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:499)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:410)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
> at
> com.nokia.csf.dfle.transforms.DfleRdbmsSource.expand(DfleRdbmsSource.java:34)
>
> at
> com.nokia.csf.dfle.transforms.DfleRdbmsSource.expand(DfleRdbmsSource.java:10)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
> at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:182)
> at
> com.nokia.csf.dfle.dsl.DFLEBeamMain.dagWireUp(DFLEBeamMain.java:49)
> at com.nokia.csf.dfle.dsl.DFLEBeamMain.main(DFLEBeamMain.java:120)
>
>
>
> The test added in JdbcIOTest.java for this functionality only tests for
> queries with parameters.
> Is this new function supported only in the above case and not for normal
> "withQuery" (without parameters) ?
>
>
> Thanks & regards,
> Kishor
>
>


Re: [Java] Accessing state from FinishBundle method

2019-07-31 Thread Kenneth Knowles
Because @FinishBundle is not executed in the context of a window, what
state would you be accessing? (analogous to the way that outputs from
finish bundle must have an explicit window specified)

It may make sense to have a separate method @FinishBundleForWindow (or some
better name) that can be called for each window that was present in a
bundle. This would easily allow use of state without having to invent new
APIs.

Kenn

On Mon, Jul 29, 2019 at 12:53 PM Pablo Estrada  wrote:

> Hello all,
> I am working on a pipeline where I'd like to write a value to state at the
> end of processing a bundle. As it turns out, I don't think this is
> possible, as FinishBundleContext does not provide a method for it; and
> doing something like so also errors out:
>
> ==
>  @FinishBundle
>   public void finishBundle(
>   FinishBundleContext c,
>   @StateId("myState") MapState myState) {
>
>  myState.put(this.myClassVarKey, this.myClassVarValue);
>   }
> ==
>
> Maybe this is not yet implemented, and remains to be? Or maybe there's
> some other way of doing it? Or maybe this is an antipattern? (though I
> doubt that last one).
> Thanks!
> -P.
>


Re: Choosing a coder for a class that contains a Row?

2019-07-26 Thread Kenneth Knowles
The most challenging part, as I understand it, surrounds automatically
inferred schemas from POJOs, where Java's nondeterministic iteration order,
combined with a row's inherent ordering, means that even an identical
pipeline will need some metadata to plumb the right fields to the right
column indices.

Most relational migration management I've done incorporates explicit
migration logic along with changes to the schema. This is quite a lot more
robust, but more implementation work, than having a default policy
proto/avro/thrift style. I think there's a lot to explore here.

Kenn

On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette  wrote:

> I know Reuven has put some thought into evolving schemas, but I'm not sure
> it's documented anywhere as of now. The only documentation I've come across
> as I bump around the schema code are some comments deep in RowCoder [1].
> Essentially the current serialization format for a row includes a row
> count as a prefix so we can detect "simple" schema changes like column
> additions and deletions. When decoding a Row, if the current schema
> contains *more* fields than the encoded Row, the remaining fields are
> populated with nulls in the resulting Row object. If the current schema
> contains *fewer* fields than the encoded Row, the additional ones are
> just dropped.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>
> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba  wrote:
>
>> I'm also really interested in the question of evolving schemas... It's
>> something I've also put off figuring out :D
>>
>> With all its warts, the LazyAvroCoder technique (a coder backed by
>> some sort of schema registry) _could_ work with "homogeneish" data
>> (i.e. if the number of schemas in play for a single coder is much,
>> much smaller than the number of elements), even if none of the the
>> schemas are known at Pipeline construction.  The portability job
>> server (which already stores and serves artifacts for running jobs)
>> might be the right place to put a schema registry... but I'm not
>> entirely convinced it's the right way to go either.
>>
>> At the same time, "simply" bumping a known schema to a new version is
>> roughly equivalent to updating a pipeline in place.
>>
>> Sending the data as Java-serialized Rows will be equivalent to sending
>> the entire schema with every record, so it _would_ work without
>> involving a new, distributed state between one coders encode and
>> anothers decode (at the cost of message size, of course).
>>
>> Ryan
>>
>>
>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada  wrote:
>> >
>> > +dev
>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>> useful.
>> >
>> > The data is change data capture from databases, and I'm putting it into
>> a Beam Row. The schema for the Row is generally homogeneous, but subject to
>> change at some point in the future if the schema in the database changes.
>> It's unusual and unlikely, but possible. I have no idea how Beam deals with
>> evolving schemas. +Reuven Lax is there documentation / examples / anything
>> around this? : )
>> >
>> > I think evolving schemas is an interesting question
>> >
>> > For now, I am going to Java-serialize the objects, and delay figuring
>> this out. But I reckon I'll have to come back to this...
>> >
>> > Best
>> > -P.
>> >
>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba  wrote:
>> >>
>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> >> pipeline construction time, but can be discovered from the instance of
>> >> MyData?
>> >>
>> >> Once discovered, is the schema "homogeneous" for all instance of
>> >> MyData?  (i.e. someRow will always have the same schema for all
>> >> instances afterwards, and there won't be another someRow with a
>> >> different schema).
>> >>
>> >> We've encountered a parallel "problem" with pure Avro data, where the
>> >> instance is a GenericRecord containing it's own Avro schema but
>> >> *without* knowing the schema until the pipeline is run.  The solution
>> >> that we've been using is a bit hacky, but we're using an ad hoc
>> >> per-job schema registry and a custom coder where each worker saves the
>> >> schema in the `encode` before writing the record, and loads it lazily
>> >> in the `decode` before reading.
>> >>
>> >> The original code is available[1] (be gentle, it was written with Beam
>> >> 0.4.0-incubating... and has continued to work until now).
>> >>
>> >> In practice, the ad hoc schema registry is just a server socket in the
>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>> >> read/write to a known location in other runners.  There are definitely
>> >> other solutions with side-inputs and providers, and the job server in
>> >> portability looks like an exciting candidate for per-job schema
>> >> registry story...
>> >>
>> >> I'm super eager to see if there are other ideas or a contribution we
>> >> 

Re: How to merge two streams and perform stateful operations on merged stream using Apache Beam

2019-07-25 Thread Kenneth Knowles
Is this the same question as your other email about your StackOverflow
question? If it is, then please see my answer on StackOverflow. If it is
not, can you explain a little more?

Kenn

On Wed, Jul 24, 2019 at 10:48 PM Kiran Hurakadli 
wrote:

> I have 2  kafka streams , i want to merge by some key and on top of the
> merged stream i want to perform stateful operation so that i can sum up
> counts from both streams
>
> this what i tried, but dint work ..
>
> PCollection stream1 = .. read from kafka
>
> PCollection stream2 = .. read from kafka
>
> PCollection  wonrdCount1 =  stream1.apply(...)
>
> PCollection  wonrdCount2 =  stream2.apply(...)
>
> PCollection merged = merge wordcount1 and wordcount2 using
> CoGroupByKey
>
>
>
> Pcolection finalStream = mergred.apply(...)
>
>
> for finalstream apply state..
>
> Please suggest solution ..
>
> --
> Regards,
> *Kiran M Hurakadli.*
>


Re: applying keyed state on top of stream from co-groupByKey output

2019-07-25 Thread Kenneth Knowles
Thanks for the very detailed question! I have written up an answer and I
suggest we continue discussion there.

Kenn

On Tue, Jul 23, 2019 at 9:11 PM Kiran Hurakadli 
wrote:

>
> Hi All,
> I am trying to merge  2 data streams using coGroupByKey and applying
> stateful
> ParDo. Input to the cogroup fun is (word,count) , Full problem explained on
> stack over flow
>
> https://stackoverflow.com/questions/57162131/applying-keyed-state-on-top-of-stream-from-co-group-stream
>
> Please help me
> --
> Regards,
> *Kiran M Hurakadli.*
>


Re: Beam release 2.5.0 tag SNAPSHOT version

2019-07-19 Thread Kenneth Knowles
Good catch.

The release 2.5.0 was built with gradle, so that pom is left over. The
gradle release plugin does not edit poms, so it did not change that.
Instead, the pom is generated and you can find them on maven central like
https://repo1.maven.org/maven2/org/apache/beam/beam-runners-direct-java/2.5.0/beam-runners-direct-java-2.5.0.pom.
Today
the pom has been removed and it is generated at run time.

Since you are digging into details, you may also be interested in the RC
vote and verification thread:
https://lists.apache.org/thread.html/dac093378fb27331c5d9d86a3bd03397f7e186d482f0fc8c8ef88feb@%3Cdev.beam.apache.org%3E

To answer your original question, I do believe the tag v2.5.0 and
v2.5.0-RC2 point to the commit where the released artifacts were built.

Kenn

On Wed, Jul 17, 2019 at 10:54 PM Abdul Qadeer  wrote:

> Hi Kenneth!
>
> But even the tag v2.5.0 points to SNAPSHOT version for Maven:
> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
>
> The same file in v2.4.0:
> https://github.com/apache/beam/blob/v2.4.0/pom.xml#L37
>
> I am looking to publish v2.5.0 artifacts via maven to a private
> artifactory, and without right versions it will publish it as a SNAPSHOT
> version. Shall I raise a PR for this?
>
> On Wed, Jul 17, 2019 at 3:41 PM Kenneth Knowles  wrote:
>
>> What you have pointed to is the tip of the release-2.5.0 branch. The
>> gradle release plugin copies the maven release plugin. So it has rolled
>> back the version change so the branch is always at a snapshot version.
>>
>> The commit before that is tag v2.5.0 and that is the final tag. Here is
>> the gradle properties:
>> https://github.com/apache/beam/blob/v2.5.0/gradle.properties
>>
>> I do believe this should be 2.5.0, not 2.5.0-RC2. But anyhow I think that
>> is the commit that was used to build 2.5.0.
>>
>> Kenn
>>
>> On Wed, Jul 17, 2019 at 3:36 PM Kenneth Knowles  wrote:
>>
>>> I take that back - misclicked on 0.5.0 (which has a correct tag).
>>>
>>> On Wed, Jul 17, 2019 at 3:34 PM Kenneth Knowles  wrote:
>>>
>>>> Looks like it is this:
>>>> https://github.com/apache/beam/tree/4838ae16c172252bc0a15e3a984e085f82e25c2d
>>>>
>>>> I believe the release manager created the tag to point to the tip of
>>>> the release branch after the maven release plugin rolled that change back
>>>> (this is how the maven release plugin works since it does not utilize git's
>>>> branching model).
>>>>
>>>> I will fix the tag.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jul 17, 2019 at 3:32 PM Abdul Qadeer 
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Why is v2.5.0 tag for Beam 2.5.0 release SNAPSHOT version here?
>>>>> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
>>>>> https://github.com/apache/beam/blob/release-2.5.0/gradle.properties#L25
>>>>>
>>>>> Please let me know where to find final release 2.5.0 commit.
>>>>>
>>>>


Re: Beam release 2.5.0 tag SNAPSHOT version

2019-07-17 Thread Kenneth Knowles
What you have pointed to is the tip of the release-2.5.0 branch. The gradle
release plugin copies the maven release plugin. So it has rolled back the
version change so the branch is always at a snapshot version.

The commit before that is tag v2.5.0 and that is the final tag. Here is the
gradle properties:
https://github.com/apache/beam/blob/v2.5.0/gradle.properties

I do believe this should be 2.5.0, not 2.5.0-RC2. But anyhow I think that
is the commit that was used to build 2.5.0.

Kenn

On Wed, Jul 17, 2019 at 3:36 PM Kenneth Knowles  wrote:

> I take that back - misclicked on 0.5.0 (which has a correct tag).
>
> On Wed, Jul 17, 2019 at 3:34 PM Kenneth Knowles  wrote:
>
>> Looks like it is this:
>> https://github.com/apache/beam/tree/4838ae16c172252bc0a15e3a984e085f82e25c2d
>>
>> I believe the release manager created the tag to point to the tip of the
>> release branch after the maven release plugin rolled that change back (this
>> is how the maven release plugin works since it does not utilize git's
>> branching model).
>>
>> I will fix the tag.
>>
>> Kenn
>>
>> On Wed, Jul 17, 2019 at 3:32 PM Abdul Qadeer 
>> wrote:
>>
>>> Hi!
>>>
>>> Why is v2.5.0 tag for Beam 2.5.0 release SNAPSHOT version here?
>>> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
>>> https://github.com/apache/beam/blob/release-2.5.0/gradle.properties#L25
>>>
>>> Please let me know where to find final release 2.5.0 commit.
>>>
>>


Re: Beam release 2.5.0 tag SNAPSHOT version

2019-07-17 Thread Kenneth Knowles
I take that back - misclicked on 0.5.0 (which has a correct tag).

On Wed, Jul 17, 2019 at 3:34 PM Kenneth Knowles  wrote:

> Looks like it is this:
> https://github.com/apache/beam/tree/4838ae16c172252bc0a15e3a984e085f82e25c2d
>
> I believe the release manager created the tag to point to the tip of the
> release branch after the maven release plugin rolled that change back (this
> is how the maven release plugin works since it does not utilize git's
> branching model).
>
> I will fix the tag.
>
> Kenn
>
> On Wed, Jul 17, 2019 at 3:32 PM Abdul Qadeer 
> wrote:
>
>> Hi!
>>
>> Why is v2.5.0 tag for Beam 2.5.0 release SNAPSHOT version here?
>> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
>> https://github.com/apache/beam/blob/release-2.5.0/gradle.properties#L25
>>
>> Please let me know where to find final release 2.5.0 commit.
>>
>


Re: Beam release 2.5.0 tag SNAPSHOT version

2019-07-17 Thread Kenneth Knowles
Looks like it is this:
https://github.com/apache/beam/tree/4838ae16c172252bc0a15e3a984e085f82e25c2d

I believe the release manager created the tag to point to the tip of the
release branch after the maven release plugin rolled that change back (this
is how the maven release plugin works since it does not utilize git's
branching model).

I will fix the tag.

Kenn

On Wed, Jul 17, 2019 at 3:32 PM Abdul Qadeer  wrote:

> Hi!
>
> Why is v2.5.0 tag for Beam 2.5.0 release SNAPSHOT version here?
> https://github.com/apache/beam/blob/v2.5.0/pom.xml#L37
> https://github.com/apache/beam/blob/release-2.5.0/gradle.properties#L25
>
> Please let me know where to find final release 2.5.0 commit.
>


Re: [Java] TextIO not reading file as expected

2019-07-12 Thread Kenneth Knowles
Glad to hear it :-)

On Fri, Jul 12, 2019 at 6:33 AM Shannon Duncan 
wrote:

> So as it turns out, it was an STDOUT issue for my logging and not a data
> read in. Beam operated just fine but the way I was debugging was causing
> the glitches.
>
> Beam is operating as expected now.
>
> On Thu, Jul 11, 2019 at 10:28 PM Kenneth Knowles  wrote:
>
>> Doesn't sound good. TextIO has been around a long time so I'm surprised.
>> Would you mind creating a ticket in Jira (
>> https://issues.apache.org/jira/projects/BEAM/) and posting some
>> technical details, like input/output/code snippets?
>>
>> Kenn
>>
>> On Thu, Jul 11, 2019 at 9:45 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> I have a file where every line is a record separated by a tab. So a tab
>>> delimited file.
>>>
>>> However as I read this file in using TextIO.read().from(filename) and
>>> pass the results to a pardo, the elements are random chunks of the records.
>>> I expected the element to be the entire line of text which then I'll do
>>> parsing on from there.
>>>
>>> This file is processed in a python pipeline with ReadFromText perfectly
>>> fine. Just curious what would cause this on the Java side?
>>>
>>> Thanks,
>>> Shannon
>>>
>>


Re: [Java] TextIO not reading file as expected

2019-07-11 Thread Kenneth Knowles
Doesn't sound good. TextIO has been around a long time so I'm surprised.
Would you mind creating a ticket in Jira (
https://issues.apache.org/jira/projects/BEAM/) and posting some technical
details, like input/output/code snippets?

Kenn

On Thu, Jul 11, 2019 at 9:45 AM Shannon Duncan 
wrote:

> I have a file where every line is a record separated by a tab. So a tab
> delimited file.
>
> However as I read this file in using TextIO.read().from(filename) and pass
> the results to a pardo, the elements are random chunks of the records. I
> expected the element to be the entire line of text which then I'll do
> parsing on from there.
>
> This file is processed in a python pipeline with ReadFromText perfectly
> fine. Just curious what would cause this on the Java side?
>
> Thanks,
> Shannon
>


Re: SDK support status clarification

2019-07-11 Thread Kenneth Knowles
That page is a better authority than this list. It has all the public
information and is up to date.

What you may be most interested in is the orange box describing the
decommissioning you mention: "The new end date has yet to be finalized but
is expected to happen in 2019. When decommissioning happens, you will no
longer be able to submit new Cloud Dataflow jobs or update running Cloud
Dataflow jobs that use the decommissioned SDKs. In addition, existing
streaming jobs that use these SDK versions may fail."

I'd suggest taking further Dataflow support questions off list, though.

Kenn

On Thu, Jul 11, 2019 at 9:52 AM Neville Li  wrote:

> Hi all, more specifically Googlers here,
>
> I want to clarify the Beam SDK support status w.r.t. Dataflow runner here:
> https://cloud.google.com/dataflow/docs/support/sdk-version-support-status
>
> When a Beam SDK is deprecated, what does it mean for users running it on
> Dataflow?
>
> The page mentions that SDK 2.0.0 to 2.4.0 "will be decommissioned in late
> 2019". Meaning that pipelines using these SDKs will stop running? Is there
> a specific date?
>
> Also similar question, are there planned decommission dates for SDK
> versions > 2.4.0?
>
> Cheers,
> Neville
>


Re: Questions about the bundled PubsubIO read implementation

2019-07-10 Thread Kenneth Knowles
This is pretty surprising. Seems valuable to file separate Jiras so we can
track investigation and resolution.

 - use gRPC: https://issues.apache.org/jira/browse/BEAM-7718
 - empty message bodies: https://issues.apache.org/jira/browse/BEAM-7716
 - watermark tracking: https://issues.apache.org/jira/browse/BEAM-7717

You reproduced these with the original PubsubIO?

Kenn

On Mon, Jul 8, 2019 at 10:38 AM Steve Niemitz  wrote:

> I was trying to use the bundled PubsubIO.Read implementation in beam on
> dataflow (using --experiments=enable_custom_pubsub_source to prevent
> dataflow from overriding it with its own implementation) and ran into some
> interesting issues.  I was curious if people have any experience with
> these.  I'd assume anyone using PubsubIO on a runner other than dataflow
> would have run into the same things.
>
> - The default implementation uses the HTTP REST API, which seems to be
> much less performant than the gRPC implementation.  Is there a reason that
> the gRPC implementation is essentially unavailable from the public API?
> PubsubIO.Read.withClientFactory is package private.  I worked around this
> by making it public and rebuilding, which led me to...
>
> - Both the JSON and gRPC implementation return empty message bodies for
> all messages read (using readMessages).  When running with the
> dataflow-specific reader, this doesn't happen and the message bodies have
> the content as expected.  I took a pipeline that works as expected on
> dataflow using PubsubIO.Read, added the experiment flag, and then my
> pipeline broke from empty message bodies.  This obviously blocked me from
> really experimenting much more.
>
> - The watermark tracking seems off.  The dataflow UI was reporting my
> watermark as around (but not exactly) the epoch (it was ~1970-01-19), which
> makes me wonder if seconds/milliseconds got confused somewhere (ie, if you
> take the time since epoch in milliseconds now and interpret it as seconds,
> you'll get somewhere around 1970-01-18).
>


Re: How Can I access the key in subclass of CombinerFn when combining a PCollection of KV pairs?

2019-06-07 Thread Kenneth Knowles
The answer on StackOverflow looks good to me.

Kenn

On Fri, Jun 7, 2019 at 4:11 AM Massy Bourennani 
wrote:

> Hi,
>
> here is the SO post:
>
> https://stackoverflow.com/questions/56451796/how-can-i-access-the-key-in-subclass-of-combinerfn-when-combining-a-pcollection
>
> Many thanks
>


  1   2   3   >