Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-16 Thread Luke Cwik via user
I upgraded the docker version on Jenkins workers and the tests passed.
(also installed Python 3.11 so we are ready for that)

On Tue, Feb 14, 2023 at 3:21 PM Kenneth Knowles  wrote:

> SGTM. I asked on the PR if this could impact users, but having read the
> docker release calendar I am not concerned. The last update to the old
> version was in 2019, and the introduction of compatible versions was 2020.
>
> On Tue, Feb 14, 2023 at 3:01 PM Byron Ellis via user 
> wrote:
>
>> FWIW I am Team Upgrade Docker :-)
>>
>> On Tue, Feb 14, 2023 at 2:53 PM Luke Cwik via user 
>> wrote:
>>
>>> I made some progress in testing the container and did hit an issue where
>>> Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
>>> turns out that our boot.go crashes with "runtime/cgo: pthread_create
>>> failed: Operation not permitted" because the Ubuntu 22.04 is using new
>>> syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
>>> default of deny). We have a couple of choices here:
>>> 1) upgrade the version of docker on Jenkins and require users to
>>> similarly use a new enough version of Docker so that this isn't an issue
>>> for them
>>> 2) use Ubuntu 20.04 "Focal" as the docker container
>>>
>>> I was using Docker 20.10.21 which is why I didn't hit this issue when
>>> testing the change locally.
>>>
>>> We could also do these but they same strictly worse then either of the
>>> two options discussed above:
>>> A) disable the seccomp policy on Jenkins
>>> B) use a custom seccomp policy on Jenkins
>>>
>>> My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu
>>> 22.04 as it will have LTS releases till 2027 and then security patches till
>>> 2032 which gives everyone the longest runway till we need to swap OS
>>> versions again for users of Apache Beam. Any concerns or ideas?
>>>
>>>
>>>
>>> On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:
>>>
>>>> Our current container java 8 container is 262 MiBs and layers on top of
>>>> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
>>>> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>>>>
>>>> I would rather not get into issues with C library differences caused by
>>>> the alpine project so I would stick with the safer option and let users
>>>> choose alpine when building their custom container if they feel it provides
>>>> a large win for them. We can always swap to alpine in the future as well if
>>>> the C library differences become a non-issue.
>>>>
>>>> So swapping to eclipse-temurin will save us a bunch on the container
>>>> size which should help with container transfer and hopefully for startup
>>>> times as well.
>>>>
>>>> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud 
>>>> wrote:
>>>>
>>>>> This sounds reasonable to me as well.
>>>>>
>>>>> I've made swaps like this in the past, the base image of each is
>>>>> probably a bigger factor than the JDK. The openjdk images were based on
>>>>> Debian 11. The default eclipse-temurin images are based on Ubuntu 22.04
>>>>> with an alpine option. Ubuntu is a Debian derivative but the versions and
>>>>> package names aren't exact matches and Ubuntu tends to update a little
>>>>> faster. For most users I don't think this will matter but users building
>>>>> custom containers may need to make minor changes. The alpine option will 
>>>>> be
>>>>> much smaller (which could be a significant improvement) but would be a 
>>>>> more
>>>>> significant change to the environment.
>>>>>
>>>>> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
>>>>> d...@beam.apache.org> wrote:
>>>>>
>>>>>> Seams reasonable to me.
>>>>>>
>>>>>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>> >
>>>>>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses
>>>>>> have stopped being built and supported since July 2022. I have filed [2] 
>>>>>> to
>>>>>> track the resolution of this issue.
>>>>>> >
>>>>>&

Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-14 Thread Luke Cwik via user
I made some progress in testing the container and did hit an issue where
Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
turns out that our boot.go crashes with "runtime/cgo: pthread_create
failed: Operation not permitted" because the Ubuntu 22.04 is using new
syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
default of deny). We have a couple of choices here:
1) upgrade the version of docker on Jenkins and require users to similarly
use a new enough version of Docker so that this isn't an issue for them
2) use Ubuntu 20.04 "Focal" as the docker container

I was using Docker 20.10.21 which is why I didn't hit this issue when
testing the change locally.

We could also do these but they same strictly worse then either of the two
options discussed above:
A) disable the seccomp policy on Jenkins
B) use a custom seccomp policy on Jenkins

My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu 22.04
as it will have LTS releases till 2027 and then security patches till 2032
which gives everyone the longest runway till we need to swap OS versions
again for users of Apache Beam. Any concerns or ideas?



On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:

> Our current container java 8 container is 262 MiBs and layers on top of
> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>
> I would rather not get into issues with C library differences caused by
> the alpine project so I would stick with the safer option and let users
> choose alpine when building their custom container if they feel it provides
> a large win for them. We can always swap to alpine in the future as well if
> the C library differences become a non-issue.
>
> So swapping to eclipse-temurin will save us a bunch on the container size
> which should help with container transfer and hopefully for startup times
> as well.
>
> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud  wrote:
>
>> This sounds reasonable to me as well.
>>
>> I've made swaps like this in the past, the base image of each is probably
>> a bigger factor than the JDK. The openjdk images were based on Debian 11.
>> The default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
>> option. Ubuntu is a Debian derivative but the versions and package names
>> aren't exact matches and Ubuntu tends to update a little faster. For most
>> users I don't think this will matter but users building custom containers
>> may need to make minor changes. The alpine option will be much smaller
>> (which could be a significant improvement) but would be a more significant
>> change to the environment.
>>
>> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
>> d...@beam.apache.org> wrote:
>>
>>> Seams reasonable to me.
>>>
>>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
>>> wrote:
>>> >
>>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
>>> stopped being built and supported since July 2022. I have filed [2] to
>>> track the resolution of this issue.
>>> >
>>> > Based upon [1], almost everyone is swapping to the eclipse-temurin
>>> container[3] as their base based upon the linked issues from the
>>> deprecation notice[1]. The eclipse-temurin container is released under
>>> these licenses:
>>> > Apache License, Version 2.0
>>> > Eclipse Distribution License 1.0 (BSD)
>>> > Eclipse Public License 2.0
>>> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
>>> Assembly Exception
>>> > 一 (Secondary) GNU General Public License, version 2 with the GNU
>>> Classpath Exception
>>> >
>>> > I propose that we swap all our containers to the eclipse-temurin
>>> containers[3].
>>> >
>>> > Open to other ideas and also would be great to hear about your
>>> experience in any other projects that you have had to make a similar
>>> decision.
>>> >
>>> > 1: https://github.com/docker-library/openjdk/issues/505
>>> > 2: https://github.com/apache/beam/issues/25371
>>> > 3: https://hub.docker.com/_/eclipse-temurin
>>>
>>


Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-09 Thread Luke Cwik via user
Our current container java 8 container is 262 MiBs and layers on top of
openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.

I would rather not get into issues with C library differences caused by the
alpine project so I would stick with the safer option and let users choose
alpine when building their custom container if they feel it provides a
large win for them. We can always swap to alpine in the future as well if
the C library differences become a non-issue.

So swapping to eclipse-temurin will save us a bunch on the container size
which should help with container transfer and hopefully for startup times
as well.

On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud  wrote:

> This sounds reasonable to me as well.
>
> I've made swaps like this in the past, the base image of each is probably
> a bigger factor than the JDK. The openjdk images were based on Debian 11.
> The default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
> option. Ubuntu is a Debian derivative but the versions and package names
> aren't exact matches and Ubuntu tends to update a little faster. For most
> users I don't think this will matter but users building custom containers
> may need to make minor changes. The alpine option will be much smaller
> (which could be a significant improvement) but would be a more significant
> change to the environment.
>
> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
> d...@beam.apache.org> wrote:
>
>> Seams reasonable to me.
>>
>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
>> wrote:
>> >
>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
>> stopped being built and supported since July 2022. I have filed [2] to
>> track the resolution of this issue.
>> >
>> > Based upon [1], almost everyone is swapping to the eclipse-temurin
>> container[3] as their base based upon the linked issues from the
>> deprecation notice[1]. The eclipse-temurin container is released under
>> these licenses:
>> > Apache License, Version 2.0
>> > Eclipse Distribution License 1.0 (BSD)
>> > Eclipse Public License 2.0
>> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
>> Assembly Exception
>> > 一 (Secondary) GNU General Public License, version 2 with the GNU
>> Classpath Exception
>> >
>> > I propose that we swap all our containers to the eclipse-temurin
>> containers[3].
>> >
>> > Open to other ideas and also would be great to hear about your
>> experience in any other projects that you have had to make a similar
>> decision.
>> >
>> > 1: https://github.com/docker-library/openjdk/issues/505
>> > 2: https://github.com/apache/beam/issues/25371
>> > 3: https://hub.docker.com/_/eclipse-temurin
>>
>


OpenJDK8 / OpenJDK11 container deprecation

2023-02-07 Thread Luke Cwik via user
As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
stopped being built and supported since July 2022. I have filed [2] to
track the resolution of this issue.

Based upon [1], almost everyone is swapping to the eclipse-temurin
container[3] as their base based upon the linked issues from the
deprecation notice[1]. The eclipse-temurin container is released under
these licenses:
Apache License, Version 2.0
Eclipse Distribution License 1.0 (BSD)
Eclipse Public License 2.0
一 (Secondary) GNU General Public License, version 2 with OpenJDK Assembly
Exception
一 (Secondary) GNU General Public License, version 2 with the GNU Classpath
Exception

I propose that we swap all our containers to the eclipse-temurin
containers[3].

Open to other ideas and also would be great to hear about your experience
in any other projects that you have had to make a similar decision.

1: https://github.com/docker-library/openjdk/issues/505
2: https://github.com/apache/beam/issues/25371
3: https://hub.docker.com/_/eclipse-temurin


Re: Dataflow and mounting large data sets

2023-01-31 Thread Luke Cwik via user
I would also suggest looking at NFS client implementations in Java that
would allow you to talk to the NFS server without needing to mount it
within the OS. A quick search yielded https://github.com/raisercostin/yanfs
or https://github.com/EMCECS/nfs-client-java

On Tue, Jan 31, 2023 at 3:31 PM Chad Dombrova  wrote:

> Thanks for the info.  We are going to test this further and we'll let you
> know how it goes.
>
> -chad
>
>
> On Mon, Jan 30, 2023 at 2:14 PM Valentyn Tymofieiev 
> wrote:
>
>> It applies to custom containers as well. You can find the container
>> manifest in the GCE VM metadata, and it should have an entry for privileged
>> mode. The reason for this was to enable GPU accelerator support, but agree
>> with Robert that it is not part of any contracts, so in theory this could
>> change or perhaps be more strictly limited to accelerator support. In fact,
>> originally, this was only enabled for pipelines using accelerators but for
>> purely internal implementation details I believe it is currently enabled
>> for all pipelines.
>>
>> So for prototyping purposes I think you could try it, but I can't make
>> any guarantees in this thread that privileged mode will continue to work.
>>
>> cc: @Aaron Li  FYI
>>
>>
>> On Mon, Jan 30, 2023 at 12:16 PM Robert Bradshaw 
>> wrote:
>>
>>> I'm also not sure it's part of the contract that the containerization
>>> technology we use will always have these capabilities.
>>>
>>> On Mon, Jan 30, 2023 at 10:53 AM Chad Dombrova 
>>> wrote:
>>> >
>>> > Hi Valentyn,
>>> >
>>> >>
>>> >> Beam SDK docker containers on Dataflow VMs are currently launched in
>>> privileged mode.
>>> >
>>> >
>>> > Does this only apply to stock sdk containers?  I'm asking because we
>>> use a custom sdk container that we build.  We've tried various ways of
>>> running mount from within our custom beam container in Dataflow and we
>>> could not get it to work, while the same thing succeeds in local tests and
>>> in our CI (gitlab).  The assessment at the time (this was maybe a year ago)
>>> was that the container was not running in privileged mode, but if you think
>>> that's incorrect we can revisit this and report back with some error logs.
>>> >
>>> > -chad
>>> >
>>>
>>


Re: KafkaIo Metrics

2023-01-20 Thread Luke Cwik via user
KafkaIO#commitOffsetsInFinalize[1] is likely what you want if you want to
see Kafka's view of how the pipeline is consuming from it since the
pipeline will ensure that offsets are committed as the pipeline has
guaranteed to ingest the data.

I would suggested to use pipeline level concepts and metrics so have you
considered looking at pipeline level metrics like:
* PCollection elements processed/size instead of bytes-consumed-rate
* watermark lag / processing time lag instead of records-lag

Obviously if your trying to dig down into an existing problem then it sure
does make sense to look at Kafka level metrics if pipeline level metrics
are tell you that there is a problem in the part of the pipeline containing
Kafka.

1:
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--



On Fri, Jan 20, 2023 at 8:28 AM Alexey Romanenko 
wrote:

> IIRC, we don’t expose any Kafka Consumer metrics, so I’m afraid, that
> there is no easy way to get them in a Beam pipeline.
>
> —
> Alexey
>
> On 18 Jan 2023, at 21:43, Lydian  wrote:
>
> Hi,
> I know that Beam KafkaIO doesn't use the native kafka offset, and
> therefore I cannot use kafka metrics directly.
>
> Wondering what would be the right way to expose those metrics of my
> KafkaIO pipeline?
> Things I am interested includes:
>
>- bytes-consumed-rate
>- fetch-latency-avg
>- records-lag
>- commit-rate
>
> consumer lagWondering how people get these metrics or instead of doing
> this? or we should just enable `commit_offset_in_finalize` and then use the
> Kafka metrics directly?
>
> also wondering if there's anything to notice when enabling the
> commit_offset_in_finalize? Thanks!
>
> Sincerely,
> Lydian Lee
>
>
>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Sorry, I should have said that you should Flatten and do a GroupByKey, not
a CoGroupByKey making the pipeline like:
PCollectionA -> Flatten -> GroupByKey -> ParDo(EmitOnlyFirstElementPerKey)
PCollectionB -/

The CoGroupByKey will have one iterable per PCollection containing zero or
more elements depending on how many elements each PCollection had for that
key. So yes you could solve it with CoGroupByKey but Flatten+GroupByKey is
much simpler.

On Wed, Aug 10, 2022 at 1:31 PM Shivam Singhal 
wrote:

> Think this should solve my problem.
>
> Thanks Evan ans Luke!
>
> On Thu, 11 Aug 2022 at 1:49 AM, Luke Cwik via user 
> wrote:
>
>> Use CoGroupByKey to join the two PCollections and emit only the first
>> value of each iterable with the key.
>>
>> Duplicates will appear as iterables with more then one value while keys
>> without duplicates will have iterables containing exactly one value.
>>
>> On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal <
>> shivamsinghal5...@gmail.com> wrote:
>>
>>> I have two PCollections, CollectionA & CollectionB of type KV>> Byte[]>.
>>>
>>>
>>> I would like to merge them into one PCollection but CollectionA &
>>> CollectionB might have some elements with the same key. In those repeated
>>> cases, I would like to keep the element from CollectionA & drop the
>>> repeated element from CollectionB.
>>>
>>> Does anyone know a simple method to do this?
>>>
>>> Thanks,
>>> Shivam Singhal
>>>
>>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Use CoGroupByKey to join the two PCollections and emit only the first value
of each iterable with the key.

Duplicates will appear as iterables with more then one value while keys
without duplicates will have iterables containing exactly one value.

On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal 
wrote:

> I have two PCollections, CollectionA & CollectionB of type KV Byte[]>.
>
>
> I would like to merge them into one PCollection but CollectionA &
> CollectionB might have some elements with the same key. In those repeated
> cases, I would like to keep the element from CollectionA & drop the
> repeated element from CollectionB.
>
> Does anyone know a simple method to do this?
>
> Thanks,
> Shivam Singhal
>


Re: SDK Worker availability metrics

2022-08-10 Thread Luke Cwik via user
Flink has a set of workers, each worker has a number of task slots. A
pipeline will use the number of slots based upon what it was configured to
run with.

Are you trying to get the total number of workers, total number of tasks
slots, number of task slots your pipeline is using or number of workers
your pipeline is executing on?

I was under the impression that the first two were properties of the Flink
cluster and don't change while the third property is configured at job
submission time and also doesn't change.

I may not be understanding what you're trying to measure and why at
pipeline runtime for Flink since many of these values don't change through
the lifetime of the cluster and/or job.

On Mon, Aug 8, 2022 at 4:59 PM aryan m  wrote:

> Hi Luke!
> Thanks !! We use the Flink Runner and run SDK workers as processes [1]
> within a k8s pod. Can you please share broad steps on how one can do in the
> runner ?
>
>
> [1]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
>
>
> On Mon, Aug 8, 2022 at 8:51 AM Luke Cwik via user 
> wrote:
>
>> That code only executes within a runner and is only used by certain
>> runners and wouldn't work in general from user code that is monitoring the
>> job or user code executing within one of the workers.
>>
>> You would need to author code that is likely runner specific to look up
>> the number of workers associated with a job as I don't believe there is a
>> general way to do this for an arbitrary Apache Beam runner.
>>
>>  Which runner would you most likely want to use?
>>
>> On Sun, Aug 7, 2022 at 1:02 PM aryan m  wrote:
>>
>>> Hi Users!
>>> Is there a recommended approach to publish metrics on the number of
>>> sdk workers available/running as a gauge ?
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L267
>>> [2]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L148
>>>
>>>
>>> -- Aryan
>>>
>>


Re: SDK Worker availability metrics

2022-08-08 Thread Luke Cwik via user
That code only executes within a runner and is only used by certain runners
and wouldn't work in general from user code that is monitoring the job or
user code executing within one of the workers.

You would need to author code that is likely runner specific to look up the
number of workers associated with a job as I don't believe there is a
general way to do this for an arbitrary Apache Beam runner.

 Which runner would you most likely want to use?

On Sun, Aug 7, 2022 at 1:02 PM aryan m  wrote:

> Hi Users!
> Is there a recommended approach to publish metrics on the number of
> sdk workers available/running as a gauge ?
>
>
> [1]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L267
> [2]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L148
>
>
> -- Aryan
>


Re: [Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Luke Cwik via user
You could try Dataflow Runner v2. The difference in the implementation may
allow you to work around what is impacting the pipelines.

On Fri, Aug 5, 2022 at 9:40 AM Evan Galpin  wrote:

> Thanks Luke, I've opened a support case as well but thought it would be
> prudent to ask here in case there was something obvious with the code.  Is
> there any additional/optional validation that I can opt to use when
> building and deploying the pipeline that might give hints? Otherwise I'll
> just wait on the support case.
>
> Thanks,
> Evan
>
> On Fri, Aug 5, 2022 at 11:22 AM Luke Cwik via user 
> wrote:
>
>> I took a look at the code and nothing obvious stood out to me in the code
>> as this is a ParDo with OnWindowExpiration. Just to make sure, the rate
>> limit is per key and would only be a global rate limit if there was a
>> single key.
>>
>> Are the workers trying to start?
>> * If no, then you would need to open a support case and share some
>> job ids so that someone could debug internal service logs.
>> * If yes, then did the workers start successfully?
>> ** If no, logs should have some details as to why the worker couldn't
>> start.
>> ** If yes, are the workers getting work items?
>> *** If no, then you would need to open a support case and share some
>> job ids so that someone could debug internal service logs.
>> *** If yes then the logs should have some details as to why the work
>> items are failing.
>>
>>
>> On Fri, Aug 5, 2022 at 7:36 AM Evan Galpin  wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to create a RateLimit[1] transform that's based fairly
>>> heavily on GroupIntoBatches[2]. I've been able to run unit tests using
>>> TestPipeline to verify desired behaviour and have also run successfully
>>> using DirectRunner.  However, when I submit the same job to Dataflow it
>>> completely fails to start and only gives the error message "Workflow
>>> Failed." The job builds/uploads/submits without error, but never starts and
>>> gives no detail as to why.
>>>
>>> Is there anything I can do to gain more insight about what is going
>>> wrong?  I've included a gist of the RateLimit[1] code in case there is
>>> anything obvious wrong there.
>>>
>>> Thanks in advance,
>>> Evan
>>>
>>> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>>> [2]
>>> https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>>>
>>


Re: [Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Luke Cwik via user
I took a look at the code and nothing obvious stood out to me in the code
as this is a ParDo with OnWindowExpiration. Just to make sure, the rate
limit is per key and would only be a global rate limit if there was a
single key.

Are the workers trying to start?
* If no, then you would need to open a support case and share some job ids
so that someone could debug internal service logs.
* If yes, then did the workers start successfully?
** If no, logs should have some details as to why the worker couldn't start.
** If yes, are the workers getting work items?
*** If no, then you would need to open a support case and share some
job ids so that someone could debug internal service logs.
*** If yes then the logs should have some details as to why the work items
are failing.


On Fri, Aug 5, 2022 at 7:36 AM Evan Galpin  wrote:

> Hi all,
>
> I'm trying to create a RateLimit[1] transform that's based fairly heavily
> on GroupIntoBatches[2]. I've been able to run unit tests using TestPipeline
> to verify desired behaviour and have also run successfully using
> DirectRunner.  However, when I submit the same job to Dataflow it
> completely fails to start and only gives the error message "Workflow
> Failed." The job builds/uploads/submits without error, but never starts and
> gives no detail as to why.
>
> Is there anything I can do to gain more insight about what is going
> wrong?  I've included a gist of the RateLimit[1] code in case there is
> anything obvious wrong there.
>
> Thanks in advance,
> Evan
>
> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
> [2]
> https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>


Re: [Dataflow][Python] Guidance on HTTP ingestion on Dataflow

2022-07-19 Thread Luke Cwik via user
Even if you don't have the resource ids ahead of time, you can have a
pipeline like:
Impulse -> ParDo(GenerateResourceIds) -> Reshuffle ->
ParDo(ReadResourceIds) -> ...

You could also compose these as splittable DoFns [1, 2, 3]:
ParDo(SplittableGenerateResourceIds) -> ParDo(SplittableReadResourceIds)

The first approach is the simplest as the reshuffle will rebalance the
reading of each resource id across worker nodes but is limited in
generating resource ids on one worker. Making the generation a splittable
DoFn will mean that you can increase the parallelism of generation which is
important if there are so many that it could crash a worker or fail to have
the output committed (these kinds of failures are runner dependent on how
well they handle single bundles with large outputs). Making the reading
splittable allows you to handle a large resource (imagine a large file) so
that it can be read and processed in parallel (and will have similar
failures if the runner can't handle single bundles with large outputs).

You can always start with the first solution and swap either piece to be a
splittable DoFn depending on your performance requirements and how well the
simple solution works.

1: https://beam.apache.org/blog/splittable-do-fn/
2: https://beam.apache.org/blog/splittable-do-fn-is-available/
3: https://beam.apache.org/documentation/programming-guide/#splittable-dofns


On Tue, Jul 19, 2022 at 10:05 AM Damian Akpan 
wrote:

> Provided you have all the resources ids ahead of fetching, Beam will
> spread the fetches to its workers. It will still fetch synchronously but
> within that worker.
>
> On Tue, Jul 19, 2022 at 5:40 PM Shree Tanna  wrote:
>
>> Hi all,
>>
>> I'm planning to use Apache beam to extract and load part of the ETL
>> pipeline and run the jobs on Dataflow. I will have to do the REST API
>> ingestion on our platform. I can opt to make sync API calls from DoFn. With
>> that pipelines will stall while REST requests are made over the network.
>>
>> Is it best practice to run the REST ingestion job on Dataflow? Is there
>> any best practice I can follow to accomplish this? Just as a reference I'm
>> adding this
>> 
>> StackOverflow thread here too. Also, I notice that Rest I/O transform
>>  built-in connector
>> is in progress for Java.
>>
>> Let me know if this is the right group to ask this question. I can also
>> ask d...@beam.apache.org if needed.
>> --
>> Thanks,
>> Shree
>>
>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Luke Cwik via user
I was suggesting GCP support mainly because I don't think you want to share
the 2.36 and 2.40 version of your job file publicly as someone familiar
with the layout and format may spot a meaningful difference.

Also, if it turns out that there is no meaningful difference between the
two then the internal mechanics of how the graph is modified by Dataflow is
not surfaced back to you in enough depth to debug further.



On Fri, Jul 8, 2022 at 6:12 AM Evan Galpin  wrote:

> Thanks for your response Luke :-)
>
> Updating in 2.36.0 works as expected, but as you alluded to I'm attempting
> to update to the latest SDK; in this case there are no code changes in the
> user code, only the SDK version.  Is GCP support the only tool when it
> comes to deciphering the steps added by Dataflow?  I would love to be able
> to inspect the complete graph with those extra steps like
> "Unzipped-2/FlattenReplace" that aren't in the job file.
>
> Thanks,
> Evan
>
> On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
> wrote:
>
>> Does doing a pipeline update in 2.36 work or do you want to do an update
>> to get the latest version?
>>
>> Feel free to share the job files with GCP support. It could be something
>> internal but the coders for ephemeral steps that Dataflow adds are based
>> upon existing coders within the graph.
>>
>> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>>
>>> +dev@
>>>
>>> Reviving this thread as it has hit me again on Dataflow.  I am trying to
>>> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
>>> received an error that the step "Flatten.pCollections" was missing from the
>>> new job graph.  I knew from the code that that wasn't true, so I dumped the
>>> job file via "--dataflowJobFile" for both the running pipeline and for the
>>> new version I'm attempting to update to.  Both job files showed identical
>>> data for the Flatten.pCollections step, which raises the question of why
>>> that would have been reported as missing.
>>>
>>> Out of curiosity I then tried mapping the step to the same name, which
>>> changed the error to:  "The Coder or type for step
>>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>>> job files show identical coders for the Flatten step (though
>>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>>> changed.
>>>
>>> I'm not sure how to proceed in updating the running pipeline, and I'd
>>> really prefer not to drain.  Any ideas?
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>>> wrote:
>>>
>>>> Thanks for the ideas Luke. I checked out the json graphs as per your
>>>> recommendation (thanks for that, was previously unaware), and the
>>>> "output_info" was identical for both the running pipeline and the pipeline
>>>> I was hoping to update it with.  I ended up opting to just drain and submit
>>>> the updated pipeline as a new job.  Thanks for the tips!
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>>>
>>>>> I would suggest dumping the JSON representation (with the
>>>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>>>> graph representation is a bipartite graph where there are transform nodes
>>>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>>>> The PCollection nodes typically end with the suffix ".out". This could 
>>>>> help
>>>>> find steps that have been added/removed/renamed.
>>>>>
>>>>> The PipelineDotRenderer[1] might be of use as well.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>>>
>>>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm looking for any help regarding updating streaming jobs which are
>>>>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>>>>> situations where Fusion is involved, and trying to decipher which old 
>>>>>> steps
>>>>>> should be mapped to which new steps.
>>>>>>
>>>>>> I have a case where I updated the steps which come after the step in
>>>>>> question, but when I attempt to update there is an error that "
>>>>>> no longer produces data to the steps ". I believe that
>>>>>>  is only changed as a result of fusion, and in reality it does 
>>>>>> in
>>>>>> fact produce data to  (confirmed when deployed as a new
>>>>>> job for testing purposes).
>>>>>>
>>>>>> Is there a guide for how to deal with updates and fusion?
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-06 Thread Luke Cwik via user
Does doing a pipeline update in 2.36 work or do you want to do an update to
get the latest version?

Feel free to share the job files with GCP support. It could be something
internal but the coders for ephemeral steps that Dataflow adds are based
upon existing coders within the graph.

On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:

> +dev@
>
> Reviving this thread as it has hit me again on Dataflow.  I am trying to
> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
> received an error that the step "Flatten.pCollections" was missing from the
> new job graph.  I knew from the code that that wasn't true, so I dumped the
> job file via "--dataflowJobFile" for both the running pipeline and for the
> new version I'm attempting to update to.  Both job files showed identical
> data for the Flatten.pCollections step, which raises the question of why
> that would have been reported as missing.
>
> Out of curiosity I then tried mapping the step to the same name, which
> changed the error to:  "The Coder or type for step
> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
> job files show identical coders for the Flatten step (though
> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
> internal Dataflow thing?), so I'm confident that the coder hasn't actually
> changed.
>
> I'm not sure how to proceed in updating the running pipeline, and I'd
> really prefer not to drain.  Any ideas?
>
> Thanks,
> Evan
>
>
> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin  wrote:
>
>> Thanks for the ideas Luke. I checked out the json graphs as per your
>> recommendation (thanks for that, was previously unaware), and the
>> "output_info" was identical for both the running pipeline and the pipeline
>> I was hoping to update it with.  I ended up opting to just drain and submit
>> the updated pipeline as a new job.  Thanks for the tips!
>>
>> Thanks,
>> Evan
>>
>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>
>>> I would suggest dumping the JSON representation (with the
>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>> graph representation is a bipartite graph where there are transform nodes
>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>> The PCollection nodes typically end with the suffix ".out". This could help
>>> find steps that have been added/removed/renamed.
>>>
>>> The PipelineDotRenderer[1] might be of use as well.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>
>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm looking for any help regarding updating streaming jobs which are
>>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>>> situations where Fusion is involved, and trying to decipher which old steps
>>>> should be mapped to which new steps.
>>>>
>>>> I have a case where I updated the steps which come after the step in
>>>> question, but when I attempt to update there is an error that "
>>>> no longer produces data to the steps ". I believe that
>>>>  is only changed as a result of fusion, and in reality it does in
>>>> fact produce data to  (confirmed when deployed as a new
>>>> job for testing purposes).
>>>>
>>>> Is there a guide for how to deal with updates and fusion?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>


Re: Debugging External Transforms on Dataflow (Python)

2021-06-24 Thread Luke Cwik
It makes sense to use the reader cache instead of instantiating within
getProgress each time.

Feel free to create a JIRA and/or open a PR and send it out for review.

On Thu, Jun 24, 2021 at 5:43 AM Alex Koay  wrote:

> On the surface this looked ok, but running it on Dataflow ended up giving
> me some new errors.
> Essentially UnboundedSourceAsSDFRestrictionTracker.getProgress also uses
> currentReader, but it creates new Readers (without checking the cache key).
>
> @Boyuan Zhang  I'm guessing this should be part of
> this PR (https://github.com/apache/beam/pull/13592), do you know if there
> was something behind this?
>
> Thanks!
>
> Cheers
> Alex
>
> On Thu, Jun 24, 2021 at 4:01 PM Alex Koay  wrote:
>
>> Good news for anybody who's following this.
>> I finally had some time today to look into the problem again with some
>> fresh eyes and I figured out the problems.
>> As I suspected, the issue lies with the cachedReaders and how it uses
>> CheckpointMarks.
>>
>> With the SDF cachedReaders, it serializes the Source and CheckpointMark
>> (along with MIN_TIMESTAMP) to create a cache key for the reader so that it
>> can be reused later. Sounds good so far.
>>
>> On the other hand, Solace doesn't make use of the CheckpointMark at all
>> in Source.createReader(), opting instead to create a new Reader every time.
>> This is because Solace doesn't really have a notion of checkpoints in
>> their queue, you just get the next available message always.
>> This makes sense to me.
>> As a result, whenever createReader() is called, the CheckpointMark that
>> comes as a result is always unique.
>>
>> The other point about Solace is that its Reader acknowledges the messages
>> only upon finalization (which should be the case).
>>
>> So far, I'm re-iterating what I mentioned before. Well, here's the last
>> part I has a hunch about but finally had time to confirm today.
>>
>> While Solace's cached readers do expire after a minute, while they're
>> still running, the Solace server helpfully sends some messages (up to 255
>> -- I could be wrong about this) to the reader first (which it then waits
>> for an acknowledgement).
>> Why this happens is because the FlowReceiver that Solace has isn't yet
>> closed, the server treats the Reader as still being "active" for all
>> intents and purposes.
>> These messages, though, never get read at all by Beam, because
>> Reader.advance() is never called, and as such it stays as such until the
>> identical CheckpointMark is recalled (which it never does, because the
>> CheckpointMark has moved on).
>>
>> In the meantime, the SDF spawns more and more readers over and over again
>> (which will sooner or later go hungry) and all these will become cached
>> (with some number of unacknowledged messages), because advance() was never
>> called on it.
>>
>> Eventually, when cachedReaders hits 100 or after 60 seconds, the old
>> Readers which have some unacknowledged 255 messages will then be closed,
>> freeing up the messages to go to the other readers.
>> But now here's the kicker, the other ~99 Readers (in each thread) in the
>> cache also are active in the eyes of Solace!
>> All of them will get some messages, which never get called on, because
>> the CheckpointMark has moved on yet again.
>> This goes on for eternity, leading to the very small trickle of messages
>> coming in after that.
>>
>> I've currently fixed the problem by marking everything in Solace's
>> CheckpointMark as transient, and as a result it will always reuse the
>> cached Reader, but I'd like to discuss if there are any better ways to work
>> around this.
>> I would propose these two ideas as fallback options, especially
>> considering existing UnboundedSources.
>> 1. make Reader caching optional
>> 2. simply always reuse the existing reader
>>
>> In any case, the problem (as it was) has been resolved.
>>
>> Cheers
>> Alex
>>
>>
>> On Fri, Jun 18, 2021 at 2:31 AM Luke Cwik  wrote:
>>
>>> Yes I was referring to
>>> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>>>  since
>>> that is responsible for scheduling the bundle finalization. It will only be
>>> invoked if the current bundle completes.
>>>
>>> I would add logging to ParDoEvaluator#finishBundle[1] to see that the
>>> bundle is being completed.
>>> I would add logging to EvaluationContext#handleResult[2] to see how the
>

Re: End to end unit tests for stateful pipeline

2021-06-22 Thread Luke Cwik
I have also seen a DefaultValueFactory which looks at another setting
stating the class name it should instantiate and invoke to create the
object[1]. This way you don't need to make it serializable, you just need
the factory that creates the test instance available on the class path.

1:
https://github.com/apache/beam/blob/cf8ffe660cfcb1f7d421171f406fa991b93e043b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L164

On Thu, Jun 17, 2021 at 1:06 PM gaurav mishra 
wrote:

> Thanks Luke. That kind of worked. But to make the serialization and
> deserialization work I had to put some test code into production code.
> ProxyInvocationHandler.ensureSerializable() tries to serialize and
> deserialize my `TestRedisClient`. But since the return type of
> getRedisClient() in  ProductionPipelineOptions is an
> interface `RedisClient`,  jackson cannot deserialize the given string to an
> instance of `TestRedisClient` . So to force Jackson to instantiate the
> correct instance of RedisClient I had to move `TestRedisClient` in the code
> package where interface `RedisClient` lives. And had to add a couple of
> annotations on interface like
> @JsonTypeInfo(...)
> @JsonSubTypes({@Type(value = TestRedisClient.class, name =
> "testRedisClient")})
>
> Maybe there is still a better way to do this without having to mix my test
> related classes in the real code.
>
> On Wed, Jun 16, 2021 at 2:12 PM Luke Cwik  wrote:
>
>> In your test I would have expected to have seen something like:
>> ```
>> // create instance of TestRedisClient which is serializable
>> RedisClient testClient = createTestRedisClient();
>> ... setup any expected interactions or test data on testClient ...
>> options = PipelineOptionsFactory.as(ProductionPipelineOptions.class);
>> options.setRedisClient(testClient);
>> pipeline.run(options);
>> ```
>>
>> Your DoFn as is looks fine.
>>
>> On Mon, Jun 14, 2021 at 10:27 PM gaurav mishra <
>> gauravmishra.it...@gmail.com> wrote:
>>
>>> Hi Luke,
>>> I tried going down the path which you suggested but hitting some
>>> roadblocks. Maybe I am doing something wrong. As you said I created a unit
>>> test specific class for PipelineOptions, created a TestRedisFactory which
>>> is setup to return a mock instance of RedisClient. In my test code I have
>>>  ```
>>> options = PipelineOptionsFactory.as(TestPipelineOptions.class);
>>> // get instance of TestRedisClient which is serializable
>>> RedisClient client = options.getRedisClient();
>>> // some code to setup mocked interactions
>>> pipeline.run(options);
>>> ```
>>>
>>> In my DoFn I have
>>> ```
>>> ProductionPipelineOptions pipelineOptions =
>>> context.getPipelineOptions().as(ProductionPipelineOptions.class);
>>>
>>> // get instance of RealRedisClient
>>> RedisClient redisClient = pipelineOptions.getRedisClient();
>>> redisClient.get(key)
>>> ```
>>> In unit test my options is getting serialized along with the
>>> TestRedisClient inside it. But when my DoFn is being called the framework
>>> tries to deserialize the string representation of `TestRedisClient` to
>>> `something that implements RedisClient` and this is where I am getting
>>> stuck. Not able to wrap my head around how to tell the framework to
>>> deserialize the string to TestRedisClient and return that in my DoFn.
>>>
>>> On Mon, Jun 14, 2021 at 2:07 PM Luke Cwik  wrote:
>>>
>>>> You can create a PipelineOption which represents your Redis client
>>>> object. For tests you would set the PipelineOption to a serializable
>>>> fake/mock that can replay the results you want. The default for the
>>>> PipelineOption object would instantiate your production client. You can see
>>>> an example usage of the DefaultValueFactory here[1].
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/5cebe0fd82ade3f957fe70e25aa3e399d2e91b32/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L71
>>>>
>>>> On Mon, Jun 14, 2021 at 10:54 AM gaurav mishra <
>>>> gauravmishra.it...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I have a streaming pipeline which reads from pubsub, enriches data
>>>>> using redis and finally writes to pubsub. The code has some stateful DoFns
>>>>> with timers. I wanted to write unit tests for the whole pipeline, that
>>>>> reads from TestStream<> , enriches data using a mocked redis client, and
>>>>> writes data to a PCollection on which I can do PAsserts. The trouble I am
>>>>> having here is how to set up the mocked redis client. Are there any
>>>>> examples that I can take a look at? I am using java with junit4 as a
>>>>> testing framework.
>>>>> More details about my code are here -
>>>>> https://stackoverflow.com/questions/67963189/unit-tests-apache-beam-stateful-pipeline-with-external-dependencies
>>>>>
>>>>


Re: Rate Limiting in Beam

2021-06-17 Thread Luke Cwik
If the service returns sensible throttling errors you could use a
StatefulDoFn and buffer elements that error out due to throttling from the
service instead of failing the bundle and schedule a timer to replay them.
This will effectively max out the service as long as there is more data
then the service can handle which doesn't work too well if the service.


On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen  wrote:

> Thanks for the quick response.
> Querying the Dataflow API seems like something that could break easily,
> but I can go with that if it turns out to be easier.
>
> The Splittable DoFn way sounds interesting, but I'm not very familiar with
> that so I have some questions around it:
> Splits seem to operate on offsets within a single element. Does that mean
> that I'd set a fixed shard number x, and then I'd need to first group my
> PCollection of single elements into a PCollection of lists, each size x?
> And are the subsequent writes also limited to x workers, meaning that
> splits have the same issue as with a GroupByKey?
> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter. I'm
> assuming there is nothing similar that would allow a Splittable DoFn to
> simply figure out the number of workers even if it changes? That's probably
> very hacky anyway.
> If the above solution with fixed-size lists makes sense and will
> redistribute the writes I'm already happy, I don't necessarily need to have
> the throttling step dynamically match autoscaling.
>
> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada  wrote:
>
>> You could implement a Splittable DoFn that generates a limited number of
>> splits. We do something like this for
>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
>> keeps track of its local EPS, and generates new splits if more EPSs are
>> wanted. This should help you scale up to the maximum of EPS that you want,
>> and autoscaling will only produce the appropriate number of workers for
>> that number of splits.
>>
>> - The only issue may be that you can't "scale down" if you find that some
>> of your splits have a very low throughput, because two splits can't be
>> merged back together (does that make sense?) - but Dataflow should be able
>> to scale down and schedule multiple splits in a single worker if that's the
>> case.
>>
>> The UnboundedCountingSource is a Source, so it can't have an input (and
>> it's deprecated), but you could write a SplittableDoFn that has the same
>> behavior. Do you think this could work?
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348
>>
>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin 
>> wrote:
>>
>>> Could you possibly use a side input with fixed interval triggering[1] to
>>> query the Dataflow API to get the most recent log statement of scaling as
>>> suggested here[2]?
>>>
>>> [1]
>>> https://beam.apache.org/documentation/patterns/side-inputs/
>>> [2]
>>> https://stackoverflow.com/a/54406878/6432284
>>>
>>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen 
>>> wrote:
>>>
 Hi folks,

 I've been working on a custom PTransform that makes requests to another
 service, and would like to add a rate limiting feature there. The
 fundamental issue that I'm running into here is that I need a decent
 heuristic to estimate the worker count, so that each worker can
 independently set a limit which globally comes out to the right value. All
 of this is easy if I know how many machines I have, but I'd like to use
 Dataflow's autoscaling, which would easily break any pre-configured value.
 I have seen two main approaches for rate limiting, both for a
 configurable variable x:

- Simply assume worker count is x, then divide by x to figure out
the "local" limit. The issue I have here is that if we assume x is 500, 
 but
it is actually 50, I'm now paying for 50 nodes to throttle 10 times as 
 much
as necessary. I know the pipeline options have a reference to the 
 runner,
is it possible to get an approximate current worker count from that at
bundle start (*if* runner is DataflowRunner)?
- Add another PTransform in front of the API requests, which groups
by x number of keys, throttles, and keeps forwarding elements with an
instant trigger. I initially really liked this solution because even if 
 x
is misconfigured, I will have at most x workers running and throttle
appropriately. However, I noticed that for batch pipelines, this
effectively also caps the API request stage at x workers. If I throw in 
 a
`Reshuffle`, there is another GroupByKey (-> another stage), and nothing
gets done until every element has passed through the throttler.

 Has anyone here tried to figure out rate limiting with Beam before, and
 

Re: Debugging External Transforms on Dataflow (Python)

2021-06-17 Thread Luke Cwik
Yes I was referring to
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
since
that is responsible for scheduling the bundle finalization. It will only be
invoked if the current bundle completes.

I would add logging to ParDoEvaluator#finishBundle[1] to see that the
bundle is being completed.
I would add logging to EvaluationContext#handleResult[2] to see how the
bundle completion is being handled at the bundle finalization callback is
being invoked.

1:
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L267
2:
https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java#L157

On Thu, Jun 17, 2021 at 10:42 AM Alex Koay  wrote:

> Could you be referring to this part?
> https://github.com/apache/beam/blob/7494d1450f2fc19344c385435496634e4b28a997/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L563
>
> I've tried fiddling with it and it gave some good results if I recall
> correctly.
> I didn't mention it earlier because I thought that maybe a shorter expiry
> actually causes the readers to expire faster and thus releasing the unacked
> messages for another reader to bundle up.
>
> I can confirm that the CheckpointMark#finalizeCheckpoint() doesn't get
> called for at least some time if the bundle size is not maxed (or close to
> maxed) out.
> I've added logging into finalizeCheckpoint() and don't see it getting
> called. It's where the acknowledgements are happening.
>
> I've actually opened a Google support ticket for this as well, perhaps you
> could take a look at it (Case 28209335).
> Thanks for your reply, I'll try to debug this further too.
>
> On Fri, Jun 18, 2021 at 12:49 AM Luke Cwik  wrote:
>
>> Reducing the bundle size/timeout shouldn't be necessary since when the
>> UnboundedSource returns false from advance(), the
>> UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
>> return resume for the process continuation. This should cause
>> invokeProcessElement() to complete in
>> OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
>> implementation should finish the current bundle. This will allow the runner
>> to do two things:
>> 1) Finalize the current bundle
>> 2) Schedule the continuation for the checkpoint mark
>>
>> Based upon your description it looks like for some reason the runner is
>> unable to complete the current bundle.
>>
>> On Thu, Jun 17, 2021 at 2:48 AM Alex Koay  wrote:
>>
>>> Okay, I think I've found the issue, but now I need some help figuring
>>> out how to fix the issue.
>>>
>>> 1. Solace allows a number of unacknowledged messages before it stops
>>> sending more messages. This number just so happens to be 10k messages by
>>> default (in the queue I am using).
>>> 2. The Solace Beam transform (rightly) waits until bundle finalization
>>> before acknowledging the messages.
>>> 3. Bundle finalization doesn't happen until it either reaches 10k
>>> messages or 10s for the DataflowRunner. For the PortableRunner this seems
>>> to be 10k or an unknown timeout. This is related to the
>>> OutputAndTimeBoundedSplittableProcessElementInvoker.
>>> 4. Many readers are created (over and over) due to the
>>> UnboundedSourceAsSdfWrapperFn.
>>> 5. When a lot of readers are created, they would compete for the
>>> messages (in non-exclusive mode), eventually leaving a small number of
>>> unacknowledged messages per bundle.
>>> 6. The readers are then cached in cachedReaders in the
>>> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
>>> evicted after a minute. See https://github.com/apache/beam/pull/13592
>>> 7. The readers each have a small number of unacknowledged messages which
>>> will remain unacknowledged and cannot be given to another consumer until
>>> the bundle finalization happens.
>>> 8. When bundle finalization happens (possibly after the reader gets
>>> evicted), the messages return to the queue, only to get taken by the huge
>>> number of other competing readers.
>>>
>>> At this point, I'm guessing the few methods to fix this are:
>>> a. reduce the bundle size / reduce the bundle timeout (which all seem to
>>> be hardcoded per runner)
>>> b. reduce the number of cached readers / their timeouts (which doesn't
>>> seem to be customizable either) so that there 

Re: Debugging External Transforms on Dataflow (Python)

2021-06-17 Thread Luke Cwik
Reducing the bundle size/timeout shouldn't be necessary since when the
UnboundedSource returns false from advance(), the
UnboundedSourceAsSDFWrapperFn will schedule a bundle finalization and
return resume for the process continuation. This should cause
invokeProcessElement() to complete in
OutputAndTimeBoundedSplittableProcessElementInvoker and the runner specific
implementation should finish the current bundle. This will allow the runner
to do two things:
1) Finalize the current bundle
2) Schedule the continuation for the checkpoint mark

Based upon your description it looks like for some reason the runner is
unable to complete the current bundle.

On Thu, Jun 17, 2021 at 2:48 AM Alex Koay  wrote:

> Okay, I think I've found the issue, but now I need some help figuring out
> how to fix the issue.
>
> 1. Solace allows a number of unacknowledged messages before it stops
> sending more messages. This number just so happens to be 10k messages by
> default (in the queue I am using).
> 2. The Solace Beam transform (rightly) waits until bundle finalization
> before acknowledging the messages.
> 3. Bundle finalization doesn't happen until it either reaches 10k messages
> or 10s for the DataflowRunner. For the PortableRunner this seems to be 10k
> or an unknown timeout. This is related to the
> OutputAndTimeBoundedSplittableProcessElementInvoker.
> 4. Many readers are created (over and over) due to the
> UnboundedSourceAsSdfWrapperFn.
> 5. When a lot of readers are created, they would compete for the messages
> (in non-exclusive mode), eventually leaving a small number of
> unacknowledged messages per bundle.
> 6. The readers are then cached in cachedReaders in the
> UnboundedSourceAsSdfWrapperFn. A total of 100 readers are cached, and get
> evicted after a minute. See https://github.com/apache/beam/pull/13592
> 7. The readers each have a small number of unacknowledged messages which
> will remain unacknowledged and cannot be given to another consumer until
> the bundle finalization happens.
> 8. When bundle finalization happens (possibly after the reader gets
> evicted), the messages return to the queue, only to get taken by the huge
> number of other competing readers.
>
> At this point, I'm guessing the few methods to fix this are:
> a. reduce the bundle size / reduce the bundle timeout (which all seem to
> be hardcoded per runner)
> b. reduce the number of cached readers / their timeouts (which doesn't
> seem to be customizable either) so that there would be less contention
> c. somehow reduce the splitting process and instead reusing existing
> sources over and over
>
> I'd be happy to send pull requests to help fix this issue but perhaps will
> need some direction as to how I should fix this.
>
> On Wed, Jun 16, 2021 at 8:32 PM Alex Koay  wrote:
>
>> Alright, some updates.
>>
>> Using DirectRunner helped narrow things down quite a bit. It seems that
>> the Solace transform is somewhat buggy when used with the
>> UnboundedSourceAsSDFWrapperFn as it doesn't have a proper CheckpointMark.
>> Refer to this:
>> https://github.com/SolaceProducts/solace-apache-beam/blob/d62f5b8e275902197882e90cdf87346438fae9ac/beam-sdks-java-io-solace/src/main/java/com/solace/connector/beam/UnboundedSolaceSource.java#L40
>>
>> The source simply creates a new Reader every time createReader() is
>> called.
>>
>> Because of these, the cachedReaders in the
>> UnboundedSourceAsSDFRestrictionTracker are never purged, resulting in
>> readers not being closed, but stay in the cache.
>> Changing the timeout causes the pipeline to continue draining but at a
>> glacial pace.
>>
>> I've still not able to isolate the root cause of why it suddenly stops
>> reading more data (could be a Solace issue though).
>>
>>
>> Also, trying the easy way out, I've tried running it with 2.24.0 (the
>> last one without the SDF default Read) in Java and it works perfectly.
>> Newer versions in Java DirectRunner don't work correctly either.
>> Unfortunately Dataflow seems to expand the external transform using the
>> SDF Read version even when using 2.24.0 (I'm not entirely sure why this is
>> the case).
>>
>> I feel like I'm almost at the verge of fixing the problem, but at this
>> point I'm still far from it.
>>
>>
>> On Wed, Jun 16, 2021 at 11:24 AM Alex Koay  wrote:
>>
>>> 1. I'm building a streaming pipeline.
>>> 2. For the pure Java transforms pipeline I believe it got substituted
>>> with a Dataflow native Solace transform (it isn't using use_runner_v2 as I
>>> think Java doesn't support that publicly yet). I used the default Java
>>> flags with a DataflowRunner.
>>> 3. I believe it's the source reader that is being created in mass.
>>>
>>> Currently I just tested the Python pipeline (with Java Solace transform)
>>> on the DirectRunner without bounds, and it seems that the issue is
>>> similarly manifesting. I'm trying to debug it this way for now.
>>>
>>> On Wed, Jun 16, 2021 at 9:01 AM Boyuan Zhang  wrote:
>>>
 In terms of the odd case you are 

Re: How avoid blocking when decompressing large GZIP files.

2021-06-14 Thread Luke Cwik
Try adding a Reshuffle transform to the pipeline after the ParDo that gives
the sequence number. This will cause the data to be materialized and then
the subsequent steps happen in parallel.

Depending on which IO transform you are using and if splitting support is
ever added for compressed files and splitting support is added for CSV
files then your pipeline might be broken since "multiple" file segments
will start from 1 and count up.

There is some advanced support for having line numbers along with the data
within ContextualTextIO which might be of interest to you as a replacement
for your implementation.


On Fri, Apr 23, 2021 at 5:10 AM Evan Galpin  wrote:

> Hmm in my somewhat limited experience, I was not able to combine state and
> Splittable DoFn. Definitely could be user error on my part though.
>
> RE sequence numbers, could it work to embed those numbers in the CSV
> itself?
>
> Thanks,
> Evan
>
> On Fri, Apr 23, 2021 at 07:55 Simon Gauld  wrote:
>
>> Thank you and I will have a look however some concerns I have
>>
>> - the gzip itself is not splittable as such
>> - I need to apply a sequence number 1..n so I believe the read *must* be
>> sequential
>>
>> However what I am looking to achieve is handing off the newly decorated
>> row as soon as the sequence is applied to it.   The issue is that the
>> entire step of applying the sequence number appear to be blocking. Also of
>> note, I am using a @DoFn.StateId.
>>
>> I'll look at SplittableDoFns, thanks.
>>
>>
>> On Fri, Apr 23, 2021 at 12:50 PM Evan Galpin 
>> wrote:
>>
>>> I could be wrong but I believe that if your large file is being read by
>>> a DoFn, it’s likely that the file is being processed atomically inside that
>>> DoFn, which cannot be parallelized further by the runner.
>>>
>>> One purpose-built way around that constraint is by using Splittable
>>> DoFn[1][2] which could be used to allow each split to read a portion of the
>>> file. I don’t know, however, how this might (or might not) work with
>>> compression.
>>>
>>> [1]
>>> https://beam.apache.org/blog/splittable-do-fn-is-available/
>>> [2]
>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:
>>>
 Hello,

 I am trying to apply a transformation to each row in a reasonably large
 (1b row) gzip compressed CSV.

 The first operation is to assign a sequence number, in this case 1,2,3..

 The second operation is the actual transformation.

 I would like to apply the sequence number *as* each row is read from
 the compressed source and then hand off the 'real' transformation work in
 parallel, using DataFlow to autoscale the workers for the transformation.

 I don't seem to be able to scale *until* all rows have been read; this
 appears to be blocking the pipeline until decompression of the entire file
 is completed.   At this point DataFlow autoscaling works as expected, it
 scales upwards and throughput is then high. The issue is the decompression
 appears to block.

 My question: in beam, is it possible to stream records from a
 compressed source? without blocking the pipeline?

 thank you

 .s




Re: End to end unit tests for stateful pipeline

2021-06-14 Thread Luke Cwik
You can create a PipelineOption which represents your Redis client object.
For tests you would set the PipelineOption to a serializable fake/mock that
can replay the results you want. The default for the PipelineOption object
would instantiate your production client. You can see an example usage of
the DefaultValueFactory here[1].

1:
https://github.com/apache/beam/blob/5cebe0fd82ade3f957fe70e25aa3e399d2e91b32/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L71

On Mon, Jun 14, 2021 at 10:54 AM gaurav mishra 
wrote:

> Hi,
> I have a streaming pipeline which reads from pubsub, enriches data using
> redis and finally writes to pubsub. The code has some stateful DoFns with
> timers. I wanted to write unit tests for the whole pipeline, that reads
> from TestStream<> , enriches data using a mocked redis client, and writes
> data to a PCollection on which I can do PAsserts. The trouble I am having
> here is how to set up the mocked redis client. Are there any examples that
> I can take a look at? I am using java with junit4 as a testing framework.
> More details about my code are here -
> https://stackoverflow.com/questions/67963189/unit-tests-apache-beam-stateful-pipeline-with-external-dependencies
>


Re: AvroCoder works differently in DataflowRunner

2020-10-19 Thread Luke Cwik
I believe Beam is still on Avro 1.8 and the work to make it compatible with
1.9 has yet to happen.

The Java distribution in Dataflow is dependent on which version of Beam
you're using. For Beam 2.23.0 I believe it is OpenJDK 1.8.0_151-b12.

On Thu, Oct 15, 2020 at 8:39 PM KV 59  wrote:

> Hi,
>
> I'm using AvroCoder for my custom type. If I use the AvroCoder and
> perform these operations Object1 -> encode -> bytes -> decode -> Object2.
> Object1 and Object2 are not equal. The same works locally.
>
> What could be the problem? I'm using Beam 2.23.0 ,  Avro 1.9.1 and Java
> 1.8 . Do the versions matter? Also I was also looking for the Java
> distribution on the DataflowRunner and couldn't find it. This might be a
> lame question but does it matter?
>
> Thanks
> Kishore
>


Re: Empty output when job stops

2020-10-19 Thread Luke Cwik
@Andrés, could you put breakpoints in your Combiner implementation and
>> see when that second output happens and why it happens (a stacktrace
>> would help, probably)
>>
>> Regarding the state internals: we would basically need to introduce one
>> more layer, instead of keeping an AccumT we need to keep an
>> Option or something of that sort. Nnot saying Java Optional
>> here, on purpose. However, changing the state type would have the
>> consequence that savepoints are no longer compatible, i.e. you cannot
>> restore a job from before this change using a Beam version after this
>> change. So I'm very reluctant.
>>
>>
>> On 15.10.20 11:51, Andrés Garagiola wrote:
>> > Thanks Luke, Aljoscha
>> >
>> > Let me know if I can help you to reproduce the problem.
>> > In my case the state is never set to null but I think that it becomes
>> null
>> > while the job is stopping. Once I run the job again from the savepoint,
>> the
>> > state is recovered normally.
>> >
>> > Let's show this with an example:
>> >
>> > t0: Add input 1 => accu state [1] => output [1]
>> > t1: Add input 2 => acu state [1,2] => output [1,2]
>> > t2: stop job with savepoint => output [1,2,3] and *output [] *
>> > t3: run job from savepoint => acu state [1,2] => no output
>> > t4: Add input 3 => acu state [1,2,3] => [1,2,3]
>> >
>> > Regards
>> >
>> > On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek 
>> > wrote:
>> >
>> >> I'll take a look.
>> >>
>> >> On 14.10.20 18:49, Luke Cwik wrote:
>> >>> Assuming that null means that the accumulator was never created is not
>> >>> right especially if null is a valid terminal state while the
>> >>> initial accumulator value is non-null. This is uncommon but possible.
>> >> Filed
>> >>> https://issues.apache.org/jira/browse/BEAM-11063.
>> >>>
>> >>> +Aljoscha Krettek  Is this something you can
>> take a
>> >>> look at?
>> >>>
>> >>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
>> >> andresgaragi...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Hi all,
>> >>>>
>> >>>> I have this problem in a stream pipeline using the runner Apache
>> Flink
>> >>>> 1.19. I want to do an upgrade to my job. I first end the job by using
>> >> the
>> >>>> Flink API creating a savepoint, and then I start the new version by
>> >> using
>> >>>> the Flink API passing the savepoint path.
>> >>>>
>> >>>> When the job ends two new records are created. The first one is OK
>> but
>> >> the
>> >>>> second one is an empty record.
>> >>>>
>> >>>>
>> >>>> My pipeline uses this window strategy:
>> >>>>
>> >>>>
>> >>>> *Window> window =*
>> >>>>
>> >>>> *Window.> >>>>
>> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
>> >>>>
>> >>>> *.triggering(AfterWatermark.pastEndOfWindow()*
>> >>>>
>> >>>> *
>> >>>>
>> >>
>>  
>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
>> >>>>
>> >>>> *
>> >>>>.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
>> >>>>
>> >>>> *
>> >>>>
>> >>
>> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
>> >>>>
>> >>>> *.accumulatingFiredPanes();*
>> >>>>
>> >>>>
>> >>>> I implemented a custom combiner, and I realized that the state of the
>> >>>> combiner is null in the second output. This line (
>> >>>>
>> >>
>> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
>> >> )
>> >>>> is evaluated to false, and then it creates an empty accumulator.
>> >>>>
>> >>>>
>> >>>> Is this the expected behavior?
>> >>>>
>> >>>>
>> >>>> Thanks
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>


Re: Processing files as they arrive with custom timestamps

2020-10-14 Thread Luke Cwik
I think you should be using the largest "complete" timestamp from the
metadata results and not be setting the watermark if you don't have one.

On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk 
wrote:

> Thank you so much for the input, that was extremely helpful!
>
> I changed the pipeline from using FileIO.match() into using a custom
> matching (very similar to the FileIO.match()) that looks as follows:
>
> static PCollection> getData(
> Pipeline pipeline,
> String filepattern,
> Duration pollInterval,
> TerminationCondition terminationCondition) {
>   final Growth 
> stringMetadataStringGrowth =
>   Watch.growthOf(
>   Contextful.of(new MatchPollFn(), Requirements.empty()), new 
> ExtractFilenameFn())
>   .withPollInterval(pollInterval)
>   .withTerminationPerInput(terminationCondition);
>   return pipeline
>   .apply("Create filepattern", Create.of(filepattern))
>   .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>   .apply(Values.create())
>   .apply(ParDo.of(new ReadFileFn()));
> }
>
> private static class MatchPollFn extends PollFn {
>   private static final Logger logger = 
> LoggerFactory.getLogger(MatchPollFn.class);
>
>   @Override
>   public Watch.Growth.PollResult apply(String element, 
> Context c)
>   throws Exception {
> // Here we only have the filepattern i.e. element, and hence we do not 
> know what the timestamp
> // and/or watermark should be. As a result, we output EPOCH as both the 
> timestamp and the
> // watermark.
> Instant instant = Instant.EPOCH;
> return Watch.Growth.PollResult.incomplete(
> instant, FileSystems.match(element, 
> EmptyMatchTreatment.ALLOW).metadata())
> .withWatermark(instant);
>   }
> }
>
> private static class ExtractFilenameFn implements 
> SerializableFunction {
>   @Override
>   public String apply(MatchResult.Metadata input) {
> return input.resourceId().toString();
>   }
> }
>
> The above together with fixing the bugs that Luke pointed out (Thank you
> Luke!), makes the unit test pass.
>
> Thank you again!
>
> If you have any feedback for the current code, I would appreciate it. I am
> especially interested whether setting event time and watermark in
> *MatchPollFn* to *EPOCH* is a correct way to go.
>
>
> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax  wrote:
>
>> FYI this is a major limitation in FileIO.match's watermarking ability. I
>> believe there is a JIRA issue about this, but nobody has ever worked on
>> improving it.
>>
>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik  wrote:
>>
>>> FileIO.match doesn't allow one to configure how the watermark advances
>>> and it assumes that the watermark during polling is always the current
>>> system time[1].
>>>
>>> Because of this the downstream watermark advancement is limited. When an
>>> element and restriction starts processing, the maximum you can hold the
>>> output watermark back by for this element and restriction pair is limited
>>> to the current input watermark (a common value to use is the current
>>> element's timestamp as the lower bound for all future output but if that
>>> element is late the output you produce may or may not be late (depends on
>>> downstream windowing strategy)). Holding this watermark back is important
>>> since many of these elements and restrictions could be processed in
>>> parallel at different rates.
>>>
>>> Based upon your implementation, you wouldn't need to control the
>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>> to say what the watermark is after each polling round and allowed you to
>>> set the timestamp for each match found. This initial setting of the
>>> watermark during polling would be properly handled by the runner to block
>>> watermark advancement for those elements.
>>>
>>> Minor comments not related to your issue but would improve your
>>> implementation:
>>> 1) Typically you set the watermark right before returning. You are
>>> missing this from the failed tryClaim loop return.
>>> 2) You should structure your loop not based upon the end of the current
>>> restriction but continue processing till tryClaim fails. For example:
>>>   @ProcessElement
>>>   public void processElement(@Element String fileName,
>>> RestrictionTracker tracker, OutputReceiver
>>> outputReceiver) throws IOException {
>>> RandomAccessFile file = new 

Re: Empty output when job stops

2020-10-14 Thread Luke Cwik
Assuming that null means that the accumulator was never created is not
right especially if null is a valid terminal state while the
initial accumulator value is non-null. This is uncommon but possible. Filed
https://issues.apache.org/jira/browse/BEAM-11063.

+Aljoscha Krettek  Is this something you can take a
look at?

On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola 
wrote:

> Hi all,
>
> I have this problem in a stream pipeline using the runner Apache Flink
> 1.19. I want to do an upgrade to my job. I first end the job by using the
> Flink API creating a savepoint, and then I start the new version by using
> the Flink API passing the savepoint path.
>
> When the job ends two new records are created. The first one is OK but the
> second one is an empty record.
>
>
> My pipeline uses this window strategy:
>
>
> *Window> window =*
>
> *Window. TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
>
> *.triggering(AfterWatermark.pastEndOfWindow()*
>
> *
>  
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
>
> *
>  .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
>
> *
> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
>
> *.accumulatingFiredPanes();*
>
>
> I implemented a custom combiner, and I realized that the state of the
> combiner is null in the second output. This line (
> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507)
> is evaluated to false, and then it creates an empty accumulator.
>
>
> Is this the expected behavior?
>
>
> Thanks
>


Re: Processing files as they arrive with custom timestamps

2020-10-14 Thread Luke Cwik
FileName().toString();
> int index = filename.lastIndexOf(".complete");
> if (index != -1) {
>   // In the case it has a suffix, strip it.
>   filename = filename.substring(0, index);
> }
> Instant timestamp =
> Instant.parse(new 
> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
> if (index != -1) {
>   // In the case it has a suffix i.e. it is complete, fast forward to the 
> next day.
>   return timestamp.plus(Duration.standardDays(1));
> }
> return timestamp;
>   }
>
>   @GetInitialRestriction
>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws 
> IOException {
> long lineCount;
> try (Stream stream = 
> Files.lines(Paths.get(metadata.resourceId().toString( {
>   lineCount = stream.count();
> }
> return new OffsetRange(0L, lineCount);
>   }
>
>   @GetInitialWatermarkEstimatorState
>   public Instant getInitialWatermarkEstimatorState(
>   @Element Metadata metadata, @Restriction OffsetRange restriction) {
> String filename = metadata.resourceId().toString();
> logger.info("getInitialWatermarkEstimatorState {}", filename);
> // Compute and return the initial watermark estimator state for each 
> element and restriction.
> // All subsequent processing of an element and restriction will be 
> restored from the existing
> // state.
> return getTimestamp(filename);
>   }
>
>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
> if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>   timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
> } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>   timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
> }
> return timestamp;
>   }
>
>   @NewWatermarkEstimator
>   public WatermarkEstimators.Manual newWatermarkEstimator(
>   @WatermarkEstimatorState Instant watermarkEstimatorState) {
> logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
> return new 
> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>   }
> }
>
>
>
> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik  wrote:
>
>> I'm working on a blog post[1] about splittable dofns that covers this
>> topic.
>>
>> The TLDR; is that FileIO.match() should allow users to control the
>> watermark estimator that is used and for your use case you should hold the
>> watermark to some computable value (e.g. the files are generated every hour
>> so once you know the last file has appeared for that hour you advance the
>> watermark to the current hour).
>>
>> 1:
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>
>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk 
>> wrote:
>>
>>> Hi,
>>>
>>> I am looking into:
>>> https://beam.apache.org/documentation/patterns/file-processing/ since I
>>> would like to create a continuous pipeline that reads from files and
>>> assigns Event Times based on e.g. file metadata or actual data inside the
>>> file. For example:
>>>
>>> private static void run(String[] args) {
>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>   Pipeline pipeline = Pipeline.create(options);
>>>
>>>   PCollection matches = pipeline
>>>   .apply(FileIO.match()
>>>   .filepattern("/tmp/input/*")
>>>   .continuously(Duration.standardSeconds(15), 
>>> Watch.Growth.never()));
>>>   matches
>>>   .apply(ParDo.of(new ReadFileFn()))
>>>
>>>   pipeline.run();
>>> }
>>>
>>> private static final class ReadFileFn extends DoFn {
>>>   private static final Logger logger = 
>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>
>>>   @ProcessElement
>>>   public void processElement(ProcessContext c) throws IOException {
>>> Metadata metadata = c.element();
>>> // I believe c.timestamp() is based on processing time.
>>> logger.info("reading {} @ {}", metadata, c.timestamp());
>>> String filename = metadata.resourceId().toString();
>>> // Output timestamps must be no earlier than the timestamp of the
>>> // current input minus the allowed skew (0 milliseconds).
>>> Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>> logger.info("lastModified @ {}", timestamp);
>>> try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>   String line;
>>>   while ((line = br.readLine()) != null) {
>>> c.outputWithTimestamp(line, c.timestamp());
>>>   }
>>> }
>>>   }
>>> }
>>>
>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>> Javadoc for details on changing the allowed skew.
>>>
>>> I believe this is because MatchPollFn.apply() uses Instant.now() as the
>>> event time for the PCollection. I can see that the call to
>>> continuously() makes the PCollection unbounded and assigns default
>>> Event Time. Without the call to continuously() I can assign the timestamps
>>> without problems either via c.outputWithTimestamp or WithTimestamp
>>> transform.
>>>
>>> I would like to know what is the way to fix the issue, and whether this
>>> use-case is currently supported in Beam.
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>


Re: Querying Dataflow job status via Java SDK

2020-10-12 Thread Luke Cwik
It is your best way to do this right now and this hasn't changed in a while
(region was added to project and job ids in the past 6 years).

On Mon, Oct 12, 2020 at 10:53 AM Peter Littig 
wrote:

> Thanks for the reply, Kyle.
>
> The DataflowClient::getJob method uses a Dataflow instance that's provided
> at construction time (via DataflowPipelineOptions::getDataflowClient). If
> that Dataflow instance can be obtained from a minimal instance of the
> options (i.e., containing only the project ID and region) then it looks
> like everything should work.
>
> I suppose a secondary question here is whether or not this approach is the
> recommended way to solve my problem (but I don't know of any alternatives).
>
> On Mon, Oct 12, 2020 at 9:55 AM Kyle Weaver  wrote:
>
>> > I think the answer is to use a DataflowClient in the second service,
>> but creating one requires DataflowPipelineOptions. Are these options
>> supposed to be exactly the same as those used by the first service? Or do
>> only some of the fields have to be the same?
>>
>> Most options are not necessary for retrieving a job. In general, Dataflow
>> jobs can always be uniquely identified by the project, region and job ID.
>> https://github.com/apache/beam/blob/ecedd3e654352f1b51ab2caae0fd4665403bd0eb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java#L100
>>
>> On Mon, Oct 12, 2020 at 9:31 AM Peter Littig 
>> wrote:
>>
>>> Hello, Beam users!
>>>
>>> Suppose I want to build two (Java) services, one that launches
>>> (long-running) dataflow jobs, and the other that monitors the status of
>>> dataflow jobs. Within a single service, I could simply track a
>>> PipelineResult for each dataflow run and periodically call getState. How
>>> can I monitor job status like this from a second, independent service?
>>>
>>> I think the answer is to use a DataflowClient in the second service, but
>>> creating one requires DataflowPipelineOptions. Are these options supposed
>>> to be exactly the same as those used by the first service? Or do only some
>>> of the fields have to be the same?
>>>
>>> Or maybe there's a better alternative than DataflowClient?
>>>
>>> Thanks in advance!
>>>
>>> Peter
>>>
>>


Re: Count based triggers and latency

2020-10-12 Thread Luke Cwik
The default trigger will only fire when the global window closes which does
happen with sources whose watermark goes > GlobalWindow.MAX_TIMESTAMP or
during pipeline drain with partial results in streaming. Bounded sources
commonly have their watermark advance to the end of time when they complete
and some unbounded sources can stop producing output if they detect the end.

Parallelization for stateful DoFns are per key and window. Parallelization
for GBK is per key and window pane. Note that  elementCountAtLeast means
that the runner can buffer as many as it wants and can decide to offer a
low latency pipeline by triggering often or better throughput through the
use of buffering.



On Mon, Oct 12, 2020 at 8:22 AM KV 59  wrote:

> Hi All,
>
> I'm building a pipeline to process events as they come and do not really
> care about the event time and watermark. I'm more interested in not
> discarding the events and reducing the latency. The downstream pipeline has
> a stateful DoFn. I understand that the default window strategy is Global
> Windows,. I did not completely understand the default trigger as per
>
> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.html
> it says Repeatedly.forever(AfterWatermark.pastEndOfWindow()), In case of
> global window how does this work (there is no end of window)?.
>
> My source is Google PubSub and pipeline is running on Dataflow runner I
> have defined my window transform as below
>
> input.apply(TRANSFORM_NAME, Window.into(new GlobalWindows())
>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>> .withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes())
>
>
> A couple of questions
>
>1. Is triggering after each element inefficient in terms of
>persistence(serialization) after each element and also parallelism
>triggering after each looks like a serial execution?
>2. How does Dataflow parallelize in such cases of triggers?
>
>
> Thanks and appreciate the responses.
>
> Kishore
>


Re: Triggers in sideInputs

2020-10-09 Thread Luke Cwik
If you don't have any watermark based triggers then using the global window
state and timers makes sense and you can rewindow into a different window
after it. A hacky was to be able to rewindow into a different windowing
strategy is to output the data to a message queue and ingest the data in
the same pipeline. People have used this to create loops and work around
some windowing issues like this.

The graph would look like:
EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output summary
to Pubsub)
Input summary from Pubsub -> Window.into(My favorite window strategy) ->
Additional processing

On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola 
wrote:

> Hi Luke,
>
> Thanks for your answer.
>
> I was studying the state/timer approach. What doesn't convince me, is the
> fact that I would have to use a global window in the main input, otherwise
> I could lost some states when I don't receive state events for a while. By
> using side inputs I keep two independents window strategies, global for
> states but fixed (or whatever) for the measurements. Do you see other way
> to overcome this?
>
> Regards
>
>
>
> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik  wrote:
>
>> Only data along the main input edge causes that DoFn to be executed
>> again. Side inputs don't cause main inputs to be reprocessed. The trigger
>> on the side input controls when the side input data becomes available and
>> is updated.
>>
>> You could choose to have a generator that produces an event on the main
>> input every hour and you could use it to look at the side input and compute
>> all the outputs that you want.
>>
>> I do think that a solution that uses state and timers would likely fit
>> more naturally to solve the problem. This blog[1] is a good starting point.
>>
>> 1: https://beam.apache.org/blog/timely-processing/
>>
>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <
>> andresgaragi...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>>
>>> I have a question regarding triggers in sideinput. I would try to
>>> explain my doubt with an example.
>>>
>>> Suppose we have a data stream of sensor events as follow:
>>>
>>>- id, timestamp, type, value
>>>
>>> Where:
>>>
>>>- id is the sensor id
>>>- timestamp is the event timestamp
>>>- type could be some of this value (measurement, new-state)
>>>- value is a conditional field following this rule. If the type is
>>>'measurement' it is a real number with the measured value. If the type
>>>field is 'new-state' value it is a string with a new sensor state.
>>>
>>> We want to produce aggregated data in a time based way (for example by
>>> an hour) as follow:
>>>
>>> Report:
>>>
>>> {
>>>
>>> 'timestamp: 'report timestamp',
>>>
>>> 'sensors': {
>>>
>>> 'sensorId1': {
>>>
>>> 'stateA': 'sum of measures in stateA',
>>>
>>> 'stateB': 'sum of measures in stateB',
>>>
>>> 'stateC': 'sum of measures in stateC',
>>>
>>> 
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>> The state at a given timestamp X is the last known state of that sensor
>>> at that moment. We could have late events (suppose max 15' late).
>>>
>>>
>>> I thought this solution:
>>>
>>>
>>> 1) Create a 'sideInput' with the states (in a global window)
>>>
>>>
>>> Window> sideInputWindow =
>>>
>>> Window.>*into*(new GlobalWindows()) //
>>>
>>> 
>>> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
>>> //
>>>
>>> .accumulatingFiredPanes();
>>>
>>>
>>> PCollectionView>>>
>>> sensorStates =
>>>
>>> 
>>>
>>>
>>> Where I have an entry to this map for every sensor, and the TreeMap has
>>> an entry for every new state (in the last 15') + 1 we need the previous one
>>> in case we didn't receive state changes in the last 15'.
>>>
>>>
>>> 2) Then, the solution aggregates values based on the sideInput
>>>
>>>
>>> Window> mainInputWindow = FixedWindow of an hour
>>> with allowed lateness 15' and accumulating fired panes.
>>>
>>>
>>> The solution is not working as I expected in this scenario (row order is
>>> processing time order):
>>>
>>>
>>> Timestamp Sensor Type Value Output Expected
>>>
>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>>
>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>>
>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>>
>>> t2 s1 new-state B No output s1 {A:10 B:20}
>>>
>>>
>>> I assumed that a new fire in the 'sideInput' would force a new fire in
>>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
>>> producing the new state, and then the mainInput will be recomputed
>>> producing the expected value. Is my assumption wrong?
>>>
>>>
>>> Thank you
>>>
>>>
>>>
>>>
>>


Re: Triggers in sideInputs

2020-10-09 Thread Luke Cwik
Only data along the main input edge causes that DoFn to be executed again.
Side inputs don't cause main inputs to be reprocessed. The trigger on the
side input controls when the side input data becomes available and is
updated.

You could choose to have a generator that produces an event on the main
input every hour and you could use it to look at the side input and compute
all the outputs that you want.

I do think that a solution that uses state and timers would likely fit more
naturally to solve the problem. This blog[1] is a good starting point.

1: https://beam.apache.org/blog/timely-processing/

On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola 
wrote:

> Hi all,
>
>
> I have a question regarding triggers in sideinput. I would try to explain
> my doubt with an example.
>
> Suppose we have a data stream of sensor events as follow:
>
>- id, timestamp, type, value
>
> Where:
>
>- id is the sensor id
>- timestamp is the event timestamp
>- type could be some of this value (measurement, new-state)
>- value is a conditional field following this rule. If the type is
>'measurement' it is a real number with the measured value. If the type
>field is 'new-state' value it is a string with a new sensor state.
>
> We want to produce aggregated data in a time based way (for example by an
> hour) as follow:
>
> Report:
>
> {
>
> 'timestamp: 'report timestamp',
>
> 'sensors': {
>
> 'sensorId1': {
>
> 'stateA': 'sum of measures in stateA',
>
> 'stateB': 'sum of measures in stateB',
>
> 'stateC': 'sum of measures in stateC',
>
> 
>
> }
>
> }
>
> }
>
> The state at a given timestamp X is the last known state of that sensor at
> that moment. We could have late events (suppose max 15' late).
>
>
> I thought this solution:
>
>
> 1) Create a 'sideInput' with the states (in a global window)
>
>
> Window> sideInputWindow =
>
> Window.>*into*(new GlobalWindows()) //
>
> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
> //
>
> .accumulatingFiredPanes();
>
>
> PCollectionView>>>
> sensorStates =
>
> 
>
>
> Where I have an entry to this map for every sensor, and the TreeMap has an
> entry for every new state (in the last 15') + 1 we need the previous one in
> case we didn't receive state changes in the last 15'.
>
>
> 2) Then, the solution aggregates values based on the sideInput
>
>
> Window> mainInputWindow = FixedWindow of an hour
> with allowed lateness 15' and accumulating fired panes.
>
>
> The solution is not working as I expected in this scenario (row order is
> processing time order):
>
>
> Timestamp Sensor Type Value Output Expected
>
> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>
> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>
> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>
> t2 s1 new-state B No output s1 {A:10 B:20}
>
>
> I assumed that a new fire in the 'sideInput' would force a new fire in the
> 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
> producing the new state, and then the mainInput will be recomputed
> producing the expected value. Is my assumption wrong?
>
>
> Thank you
>
>
>
>


Re: Processing files as they arrive with custom timestamps

2020-10-08 Thread Luke Cwik
I'm working on a blog post[1] about splittable dofns that covers this topic.

The TLDR; is that FileIO.match() should allow users to control the
watermark estimator that is used and for your use case you should hold the
watermark to some computable value (e.g. the files are generated every hour
so once you know the last file has appeared for that hour you advance the
watermark to the current hour).

1:
https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql

On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk 
wrote:

> Hi,
>
> I am looking into:
> https://beam.apache.org/documentation/patterns/file-processing/ since I
> would like to create a continuous pipeline that reads from files and
> assigns Event Times based on e.g. file metadata or actual data inside the
> file. For example:
>
> private static void run(String[] args) {
>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>   Pipeline pipeline = Pipeline.create(options);
>
>   PCollection matches = pipeline
>   .apply(FileIO.match()
>   .filepattern("/tmp/input/*")
>   .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>   matches
>   .apply(ParDo.of(new ReadFileFn()))
>
>   pipeline.run();
> }
>
> private static final class ReadFileFn extends DoFn {
>   private static final Logger logger = 
> LoggerFactory.getLogger(ReadFileFn.class);
>
>   @ProcessElement
>   public void processElement(ProcessContext c) throws IOException {
> Metadata metadata = c.element();
> // I believe c.timestamp() is based on processing time.
> logger.info("reading {} @ {}", metadata, c.timestamp());
> String filename = metadata.resourceId().toString();
> // Output timestamps must be no earlier than the timestamp of the
> // current input minus the allowed skew (0 milliseconds).
> Instant timestamp = new Instant(metadata.lastModifiedMillis());
> logger.info("lastModified @ {}", timestamp);
> try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>   String line;
>   while ((line = br.readLine()) != null) {
> c.outputWithTimestamp(line, c.timestamp());
>   }
> }
>   }
> }
>
> The issue is that when calling c.outputWithTimestamp() I am getting:
>
> Caused by: java.lang.IllegalArgumentException: Cannot output with
> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
> Javadoc for details on changing the allowed skew.
>
> I believe this is because MatchPollFn.apply() uses Instant.now() as the
> event time for the PCollection. I can see that the call to
> continuously() makes the PCollection unbounded and assigns default Event
> Time. Without the call to continuously() I can assign the timestamps
> without problems either via c.outputWithTimestamp or WithTimestamp
> transform.
>
> I would like to know what is the way to fix the issue, and whether this
> use-case is currently supported in Beam.
>
> --
> Best regards,
> Piotr
>


Re: Getting "java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION" in Flink cluster

2020-10-05 Thread Luke Cwik
In your pom.xml you are stating you want Flink 2.21.0 but you are using
2.23 elsewhere. You want these versions to match. Try updating your profile
to:
   
  flink-runner
 
 
  
   org.apache.beam
   beam-runners-flink-1.10
   *2.23.0*
  
  
  
  

On Fri, Oct 2, 2020 at 8:52 PM Tomo Suzuki  wrote:

> I suspect your dependencies have conflict. I develop Linkage Checker
> enforcer rule to identify incompatible dependencies. Do you want to give it
> a try?
>
> https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
>
> Regards,
> Tomo
>
> On Fri, Oct 2, 2020 at 21:34 Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi - We have a beam pipeline reading and writing using an SDF based IO
>> connector working fine in a local machine using Direct Runner or Flink
>> Runner. However when we build an image of that pipeline along with Flink
>> and deploy in a cluster we get below exception.
>>
>> ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>>> Unhandled exception.
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> caused an error:
>>>
>>> Classpath:
>>> [file:/opt/flink/flink-web-upload/Booster-bundled-1.0-SNAPSHOT.jar]
>>> System.out: (none)
>>> System.err: (none)
>>> at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:149)
>>> at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:89)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:101)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:56)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
>>> at
>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NoSuchFieldError: TRUNCATE_SIZED_RESTRICTION
>>> at
>>> org.apache.beam.runners.core.construction.PTransformTranslation.(PTransformTranslation.java:199)
>>> at
>>> org.apache.beam.runners.core.construction.PTransformMatchers$6.matches(PTransformMatchers.java:264)
>>> at
>>> org.apache.beam.sdk.Pipeline$2.enterCompositeTransform(Pipeline.java:272)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:653)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>>> at
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>>> at
>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
>>> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:262)
>>> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:212)
>>> at
>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
>>> at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:82)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
>>> at com.org.cx.signals.Booster.main(Booster.java:278)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> at
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:79)
>>> ... 8 more
>>
>>
>> In our pom.xml we have created a profile for flink-runner as shown below.
>>
>> 
>>>
>>>   flink-runner
>>>  
>>>  
>>>   
>>>org.apache.beam
>>>beam-runners-flink-1.10
>>>2.21.0
>>>   
>>>   
>>>   
>>>   
>>> 
>>
>>
>> And the docker image has below flink version
>>
>> FROM flink:1.10.0-scala_2.12
>>
>>
>> Both our pipeline and SDF based IO connector are on Beam 2.23.0 version.

Re: Support streaming side-inputs in the Spark runner

2020-10-02 Thread Luke Cwik
Support for watermark holds is missing for both Spark streaming
implementations (DStream and structured streaming) so watermark based
triggers don't produce the correct output.

Excluding the direct runner, Flink is the OSS runner with the most people
working on it adding features and fixing bugs in it.
Spark batch is in a good state but streaming development is still ongoing
and also has a small group of folks.


On Fri, Oct 2, 2020 at 10:16 AM  wrote:

> For clarification, is it just streaming side inputs that present an issue
> for SparkRunner or are there other areas that need work?  We've started
> work on a Beam-based project that includes both streaming and batch
> oriented work and a Spark cluster was our choice due to the perception that
> it could handle both types of applications.
>
> However, that would have to be reevaluated if SparkRunner isn't up for
> streaming deployments.  And it seems that SparkStructuredStreamingRunner
> still needs some time before it's a fully-featured solution.  I guess I'm
> trying to get a sense of whether these runners are still being actively
> developed or were they donated by a third-party and are now suffering from
> bit-rot.
>
> Oct 1, 2020, 10:54 by lc...@google.com:
>
> I would suggest trying FlinkRunner as it is a much more complete streaming
> implementation.
> SparkRunner has several key things that are missing that won't allow your
> pipeline to function correctly.
> If you're really invested in getting SparkRunner working though feel free
> to contribute the necessary implementations for watermark holds and
> broadcast state necessary for side inputs.
>
> On Tue, Sep 29, 2020 at 9:06 AM Rajagopal, Viswanathan 
> wrote:
>
> Hi Team,
>
>
>
> I have a streaming pipeline (built using Apache Beam with Spark
> Runner)which consumes events tagged with timestamps from Unbounded source
> (Kinesis Stream) and batch them into FixedWindows of 5 mins each and then,
> write all events in a window into a single / multiple files based on shards.
>
> We are trying to achieve the following through Apache Beam constructs
>
> *1.   **Create a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.*
>
> *2.   **Have a hook method that invokes per window that enables us to
> do some operational activities per window.*
>
> *3.   **Stop the stream processor (graceful stop) from external
> system.*
>
>
>
> *Approaches that we tried for 1).*
>
> ·Creating a PCollectionView from unbounded source and pass it as
> a side-input to our main pipeline.
>
> ·Input Pcollection goes through FixedWindow transform.
>
> ·Created custom CombineFn that takes combines all inputs for a
> window and produce single value Pcollection.
>
> ·Output of Window transform it goes to CombineFn (custom fn) and
> creates a PCollectionView from CombineFn (using
> Combine.Globally().asSingletonView() as this output would be passed as a
> side-input for our main pipeline.
>
> o   Getting the following exception (while running with streaming option
> set to true)
>
> ·java.lang.IllegalStateException: No TransformEvaluator
> registered for UNBOUNDED transform View.CreatePCollectionView
>
>- Noticed that SparkRunner doesn’t support the streaming side-inputs
>   in the Spark runner
>   -
>  
> https://www.javatips.net/api/beam-master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
>  (View.CreatePCollectionView.class not added to EVALUATORS Map)
>  - https://issues.apache.org/jira/browse/BEAM-2112
>  - https://issues.apache.org/jira/browse/BEAM-1564
>
> So would like to understand on this BEAM-1564 ticket.
>
>
>
> *Approaches that we tried for 2).*
>
> Tried to implement the operational activities in extractOutput() of
> CombineFn as extractOutput() called once per window. We hadn’t tested this
> as this is blocked by Issue 1).
>
> Is there any other recommended approaches to implement this feature?
>
>
>
> *Looking for recommended approaches to implement feature 3).*
>
>
>
> Many Thanks,
>
> Viswa.
>
>
>
>
>
>
>
>
>
>
>


Re: Java/Flink - Flink's shaded Netty and Beam's Netty clash

2020-10-02 Thread Luke Cwik
I have seen NoClassDefFoundErrors even when the class is there if there is
an issue loading the class (usually related to JNI failing to load or a
static block failing). Try to find the first linkage error
(ExceptionInInitializer / UnsatisifedLinkError / ...) in the logs as it
typically has more details as to why loading failed.

On Fri, Oct 2, 2020 at 8:30 AM Tomo Suzuki  wrote:

> I suspected that io.grpc:grpc-netty-shaded:jar:1.27.2 was incorrectly
> shaded, but the JAR file contains the
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2 which is
> reported as missing. Strange.
>
> suztomo-macbookpro44% jar tf grpc-netty-shaded-1.27.2.jar |grep
> IntObjectHashMap
> *io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2*.class
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$KeySet.class
>
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$MapIterator.class
>
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$MapEntry.class
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$2$1.class
>
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$PrimitiveIterator.class
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap.class
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$1.class
>
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$KeySet$1.class
>
> io/grpc/netty/shaded/io/netty/util/collection/IntObjectHashMap$EntrySet.class
>
> On Fri, Oct 2, 2020 at 6:37 AM Kaymak, Tobias 
> wrote:
>
>> No, that was not the case. I'm still seeing this message when canceling a
>> pipeline. Sorry the spam.
>>
>> On Fri, Oct 2, 2020 at 12:22 PM Kaymak, Tobias 
>> wrote:
>>
>>> I think this was caused by having the flink-runner defined twice in my
>>> pom. Oo
>>> (one time as defined with scope runtime, and one time without)
>>>
>>>
>>> On Fri, Oct 2, 2020 at 9:38 AM Kaymak, Tobias 
>>> wrote:
>>>
 Sorry that I forgot to include the versions, currently I'm on Beam
 2.23.0 / Flink 1.10.2 - I have a test dependency for cassandra (archinnov)
 which should *not *be available at runtime, refers to netty and is
 included in this tree, but the other two places where I find netty is in
 Flink and the beam-sdks-java-io-google-cloud-platform ->
 io.grpc:grpc-netty 1.27.2

 Stupid question: How can I check which version Flink 1.10.2 is
 expecting in the runtime?

 output of mvn -Pflink-runner dependency:tree

 --- maven-dependency-plugin:2.8:tree (default-cli) ---
 ch.ricardo.di:di-beam:jar:2.11.0
 +- org.apache.beam:beam-sdks-java-core:jar:2.23.0:compile
 |  +- org.apache.beam:beam-model-pipeline:jar:2.23.0:compile
 |  |  +- com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
 |  |  +- commons-logging:commons-logging:jar:1.2:compile
 |  |  +- org.apache.logging.log4j:log4j-api:jar:2.6.2:compile
 |  |  \- org.conscrypt:conscrypt-openjdk-uber:jar:1.3.0:compile
 |  +- org.apache.beam:beam-model-job-management:jar:2.23.0:compile
 |  +- org.apache.beam:beam-vendor-bytebuddy-1_10_8:jar:0.1:compile
 |  +- org.apache.beam:beam-vendor-grpc-1_26_0:jar:0.3:compile
 |  +- org.apache.beam:beam-vendor-guava-26_0-jre:jar:0.1:compile
 |  +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
 |  +- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
 |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.2:compile
 |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.2:compile
 |  +- org.apache.avro:avro:jar:1.8.2:compile
 |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
 |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
 |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
 |  +- org.xerial.snappy:snappy-java:jar:1.1.4:compile
 |  \- org.tukaani:xz:jar:1.8:compile
 +-
 org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.23.0:compile
 |  +-
 org.apache.beam:beam-sdks-java-expansion-service:jar:2.23.0:compile
 |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.23.0:compile
 |  +-
 org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:jar:2.23.0:compile
 |  |  +- com.google.cloud.bigdataoss:gcsio:jar:2.1.3:compile
 |  |  \-
 com.google.apis:google-api-services-cloudresourcemanager:jar:v1-rev20200311-1.30.9:compile
 |  +- com.google.cloud.bigdataoss:util:jar:2.1.3:compile
 |  |  +-
 com.google.api-client:google-api-client-java6:jar:1.30.9:compile
 |  |  +-
 com.google.api-client:google-api-client-jackson2:jar:1.30.9:compile
 |  |  +-
 com.google.oauth-client:google-oauth-client-java6:jar:1.30.6:compile
 |  |  +- com.google.flogger:google-extensions:jar:0.5.1:compile
 |  |  |  \- com.google.flogger:flogger:jar:0.5.1:compile
 |  |  \- com.google.flogger:flogger-system-backend:jar:0.5.1:runtime
 |  |

Re: [ANNOUNCE] Beam Java 8 image rename starting from 2.26.0 (to apache/beam_java8_sdk)

2020-10-01 Thread Luke Cwik
Can we copy the beam_java_sdk image to beam_java8_sdk for a few prior
releases so people who are on an older release can migrate now and not have
to remember to do it with 2.26?

On Tue, Sep 29, 2020 at 5:37 PM Emily Ye  wrote:

> Starting with the release of 2.26.0, the Java 8 SDK container image
>  (currently
> apache/beam_java_sdk )
> will be released under a new name, *apache/beam_java8_sdk.* This is in
> anticipation of future plans to release a Java 11 image[1]
>
> This will apply to:
>
>- officially released image starting with v2.26.0
>- *immediately for development images* built from master (post PR
>#12505 ) - i.e. if you are
>using a script/command with an argument that includes beam_java_sdk, use
>beam_java8_sdk.
>
>
> [1] Related JIRA: https://issues.apache.org/jira/browse/BEAM-8106
>
>


Re: Setting AWS endponts per transform

2020-09-25 Thread Luke Cwik
You can create a PipelineOptions interface with a unique endpoint
identifier for each service you want to use like:
public interface AwsEndpointOptions extends PipelineOptions {
  @Default.InstanceFactory(FallbackToAwsOptions.class);
  String getSnsEndpoint();
  void setSnsEndpoint();

  @Default.InstanceFactory(FallbackToAwsOptions.class);
  String getS3Endpoint();
  void setS3Endpoint();
  ...

  class FallbackToAwsOptions implements DefaultValueFactory {
@Override
public String create(PipelineOptions options) {
  return options.as(AwsOptions.class).getAwsServiceEndpoint();
}
  }
}

It looks like AWS IOs support passing in a provider[1] and/or configuring
the endpoint during pipeline construction. Then during pipeline
creation/execution you can configure each AWS IO instance with the specific
endpoint from this new PipelineOptions interface.

1:
https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java#L273

On Thu, Sep 24, 2020 at 4:24 PM  wrote:

> We're currently working on getting our Beam app working with localstack
> (and potentially other AWS regions).  We're using SqsIO and S3 as part of
> our pipeline (with other AWS components likely to come into the mix).
> While I could cast the PipelineOptions to AwsOptions and then call
> AwsOptions.setAwsServiceEndpoint() prior to pipeline construction, that
> won't work as different AWS services make use of different endpoints --
> e.g. the endpoint for SqsIO isn't going to work for S3.
>
> What I'd really like to do is provide a different set of AwsOptions per
> AWS service.  What's the best means of accomplishing this?
>
> Tim.
>
>


Re: disable ssl check/set verify cert false for elasticsearchIO

2020-09-25 Thread Luke Cwik
Yeah, the JvmInitializer would be necessary if you wanted to use a file
based keystore but I have seen people create in memory keystores[1].

Take a look at the contribution guide[2].

1: https://gist.github.com/mikeapr4/3b2b5d05bc57640e77d0
2: https://beam.apache.org/contribute/

On Fri, Sep 25, 2020 at 9:17 AM Mohil Khare  wrote:

> Hi Luke,
> Yeah, I looked at withTrustSelfSignedCerts options, but after looking at
> elasticsearchIO code, it seems that flag comes into effect only when
> "withKeystorePath" is provided.
> I don't think that can be any gcs path. I believe that is a path in worker
> VM and for that I will have to use JvmInitliazer to add a custom keystore.
>
> Regarding: "Feel free to create a PR and add any options following the
> pattern that is already there in the code" . What are the guidelines for
> adding code and creating PR for beam codebase. Can you point me to the
> relevant document?
>
> Thanks and regards
> Mohil
>
>
> On Fri, Sep 25, 2020 at 8:45 AM Luke Cwik  wrote:
>
>> It doesn't look like it based upon the javadoc:
>>
>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html
>>
>> You can allow self signed certs which might be enough for you though:
>>
>> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html#withTrustSelfSignedCerts-boolean-
>>
>> Feel free to create a PR and add any options following the pattern that
>> is already there in the code.
>>
>> On Thu, Sep 24, 2020 at 8:36 PM Mohil Khare  wrote:
>>
>>> Hello team and elasticSearchIO users,
>>>
>>> I am using beam java sdk 2.23.0 with elastic search as one of the sinks.
>>>
>>> Is there any way to turn off ssl check i.e. set cert verify false for
>>> https connection with elasticsearchIO ? I know using regular clients, you
>>> can do that. But can we achieve the same using elasticsearchIO ?
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>


Re: disable ssl check/set verify cert false for elasticsearchIO

2020-09-25 Thread Luke Cwik
It doesn't look like it based upon the javadoc:
https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html

You can allow self signed certs which might be enough for you though:
https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.ConnectionConfiguration.html#withTrustSelfSignedCerts-boolean-

Feel free to create a PR and add any options following the pattern that is
already there in the code.

On Thu, Sep 24, 2020 at 8:36 PM Mohil Khare  wrote:

> Hello team and elasticSearchIO users,
>
> I am using beam java sdk 2.23.0 with elastic search as one of the sinks.
>
> Is there any way to turn off ssl check i.e. set cert verify false for
> https connection with elasticsearchIO ? I know using regular clients, you
> can do that. But can we achieve the same using elasticsearchIO ?
>
> Thanks and regards
> Mohil
>


Re: Output from Window not getting materialized

2020-09-18 Thread Luke Cwik
To answer your specific question, you should create and return the WallTime
estimator. You shouldn't need to interact with it from within
your @ProcessElement call since your elements are using the current time
for their timestamp.

On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik  wrote:

> Kafka is a complex example because it is adapting code from before there
> was an SDF implementation (namely the TimestampPolicy and the
> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>
> There are three types of watermark estimators that are in the Beam Java
> SDK today:
> Manual: Can be invoked from within your @ProcessElement method within your
> SDF allowing you precise control over what the watermark is.
> WallTime: Doesn't need to be interacted with, will report the current time
> as the watermark time. Once it is instantiated and returned via the
> @NewWatermarkEstimator method you don't need to do anything with it. This
> is functionally equivalent to calling setWatermark(Instant.now()) right
> before returning from the @ProcessElement method in the SplittableDoFn on a
> Manual watermark.
> TimestampObserving: Is invoked using the output timestamp for every
> element that is output. This is functionally equivalent to calling
> setWatermark after each output within your @ProcessElement method in the
> SplittableDoFn. The MonotonicallyIncreasing implementation for
> the TimestampObserving estimator ensures that the largest timestamp seen so
> far will be reported for the watermark.
>
> The default is to not set any watermark estimate.
>
> For all watermark estimators you're allowed to set the watermark estimate
> to anything as the runner will recompute the output watermark as:
> new output watermark = max(previous output watermark, min(upstream
> watermark, watermark estimates))
> This effectively means that the watermark will never go backwards from the
> runners point of view but that does mean that setting the watermark
> estimate below the previous output watermark (which isn't observable) will
> not do anything beyond holding the watermark at the previous output
> watermark.
>
> Depending on the windowing strategy and allowed lateness, any records that
> are output with a timestamp that is too early can be considered droppably
> late, otherwise they will be late/ontime/early.
>
> So as an author for an SDF transform, you need to figure out:
> 1) What timestamp your going to output your records at
> * use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default
> * use data from within the record being output or external system state
> via an API call: use a watermark estimator
> 2) How you want to compute the watermark estimate (if at all)
> * the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>
> For both of these it is upto you to choose how much flexibility in these
> decisions you want to give to your users and that should guide what you
> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
> many other sources don't expose anything.
>
>
> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi Luke,
>>
>> I am also looking at the `WatermarkEstimators.manual` option, in
>> parallel. Now we are getting data past our Fixed Window but the aggregation
>> is not as expected.  The doc says setWatermark will "set timestamp
>> before or at the timestamps of all future elements produced by the
>> associated DoFn". If I output with a timestamp as below then could you
>> please clarify on how we should set the watermark for this manual
>> watermark estimator?
>>
>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:
>>
>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>> close allowing for the Count transform to produce output?
>>>
>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>
>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
>>> wrote:
>>>
>>>> Hi everyone!
>>>>
>>>> We are developing a new IO connector using the SDF API, and testing it
>>>> with the following simple counting pipeline:
>>>>
>>>>
>>>>
>>>> p.apply(MyIO.read()
>>>>
>>>> .withStream(inputStream)
>>>>
>>>>   

Re: Output from Window not getting materialized

2020-09-18 Thread Luke Cwik
Kafka is a complex example because it is adapting code from before there
was an SDF implementation (namely the TimestampPolicy and the
TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).

There are three types of watermark estimators that are in the Beam Java SDK
today:
Manual: Can be invoked from within your @ProcessElement method within your
SDF allowing you precise control over what the watermark is.
WallTime: Doesn't need to be interacted with, will report the current time
as the watermark time. Once it is instantiated and returned via the
@NewWatermarkEstimator method you don't need to do anything with it. This
is functionally equivalent to calling setWatermark(Instant.now()) right
before returning from the @ProcessElement method in the SplittableDoFn on a
Manual watermark.
TimestampObserving: Is invoked using the output timestamp for every element
that is output. This is functionally equivalent to calling setWatermark
after each output within your @ProcessElement method in the SplittableDoFn.
The MonotonicallyIncreasing implementation for the TimestampObserving
estimator ensures that the largest timestamp seen so far will be reported
for the watermark.

The default is to not set any watermark estimate.

For all watermark estimators you're allowed to set the watermark estimate
to anything as the runner will recompute the output watermark as:
new output watermark = max(previous output watermark, min(upstream
watermark, watermark estimates))
This effectively means that the watermark will never go backwards from the
runners point of view but that does mean that setting the watermark
estimate below the previous output watermark (which isn't observable) will
not do anything beyond holding the watermark at the previous output
watermark.

Depending on the windowing strategy and allowed lateness, any records that
are output with a timestamp that is too early can be considered droppably
late, otherwise they will be late/ontime/early.

So as an author for an SDF transform, you need to figure out:
1) What timestamp your going to output your records at
* use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default
* use data from within the record being output or external system state via
an API call: use a watermark estimator
2) How you want to compute the watermark estimate (if at all)
* the choice here depends on how the elements timestamps progress, are they
in exactly sorted order, almost sorted order, completely unsorted, ...?

For both of these it is upto you to choose how much flexibility in these
decisions you want to give to your users and that should guide what you
expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
many other sources don't expose anything.


On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke,
>
> I am also looking at the `WatermarkEstimators.manual` option, in parallel.
> Now we are getting data past our Fixed Window but the aggregation is not as
> expected.  The doc says setWatermark will "set timestamp before or at the
> timestamps of all future elements produced by the associated DoFn". If I
> output with a timestamp as below then could you please clarify on how we
> should set the watermark for this manual watermark estimator?
>
> receiver.outputWithTimestamp(ossRecord, Instant.now());
>
> Thanks,
> Praveen
>
> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:
>
>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>> close allowing for the Count transform to produce output?
>>
>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
>> wrote:
>>
>>> Hi everyone!
>>>
>>> We are developing a new IO connector using the SDF API, and testing it
>>> with the following simple counting pipeline:
>>>
>>>
>>>
>>> p.apply(MyIO.read()
>>>
>>> .withStream(inputStream)
>>>
>>> .withStreamPartitions(Arrays.asList(0))
>>>
>>> .withConsumerConfig(config)
>>>
>>> ) // gets a PCollection>
>>>
>>>
>>>
>>>
>>>
>>> .apply(Values.*create*()) // PCollection
>>>
>>>
>>>
>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))
>>>
>>> .withAllowedLateness(Duration.standardDays(1))
>>>
>>> .accumulatingFiredPanes())
>>>
>>>
>>>
>>> .apply(Count.perElement())
>>>
>>>
>>>
>>>
>>>
>>> // write PCollection> to stream
>>>
>>> .apply(MyIO.write()
>>>
>>> .withStream(outputStream)
>>>
>>> .withConsumerConfig(config));
>>>
>>>
>>>
>>>
>>>
>>> Without the window transform, we can read from the stream and write to
>>> it, however, I don’t see output after the Window transform. Could you
>>> please help pin down the issue?
>>>
>>> Thank you,
>>>
>>> Gaurav
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Dataflow isn't parallelizing

2020-09-11 Thread Luke Cwik
Inserting the Reshuffle is the easiest answer to test that parallelization
starts happening.

If the performance is good but you're materializing too much data at the
shuffle boundary you'll want to convert your high fanout function (?Read
from Snowflake?) into a splittable DoFn.

On Fri, Sep 11, 2020 at 9:56 AM Eugene Kirpichov 
wrote:

> Hi,
>
> Most likely this is because of fusion - see
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization
> . You need to insert a Reshuffle.viaRandomKey(), most likely after the
> first step.
>
> On Fri, Sep 11, 2020 at 9:41 AM Alan Krumholz 
> wrote:
>
>> Hi DataFlow team,
>> I have a simple pipeline that I'm trying to speed up using DataFlow:
>>
>> [image: image.png]
>>
>> As you can see the bottleneck is the "transcribe mp3" step. I was hoping
>> DataFlow would be able to run many of these in parallel to speed up the
>> total execution time.
>>
>> However it seems it doesn't do that... and instead keeps executing it all
>> independent inputs sequentially
>> Even when I tried to force it to start with many workers it rapidly shuts
>> down most of them and only keeps one alive and doesn't ever seem to
>> parallelize this step :(
>>
>> Any advice on what else to try to make it do this?
>>
>> Thanks so much!
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


Re: Installing ffmpeg on a python dataflow job

2020-09-10 Thread Luke Cwik
Did ffmpeg install change $PATH? (this may not be visible to the current
process)
Have you tried the full path to the executable?

On Wed, Sep 9, 2020 at 1:48 PM Alan Krumholz 
wrote:

> Hi DataFlow team,
>
> We are trying to use ffmpeg to process some video data using dataflow.
> In order to do this we need the worker nodes to have ffmpeg installed.
>
> After reading Beam docs I created a setup.py file for my job like this:
>
> #!/usr/bin/python
> import subprocess
> from distutils.command.build import build as _build
> import setuptools
>
> class build(_build):
> sub_commands = _build.sub_commands + [('CustomCommands', None)]
>
> class CustomCommands(setuptools.Command):
> def initialize_options(self):
> pass
>
> def finalize_options(self):
> pass
>
> def RunCustomCommand(self, command_list):
> p = subprocess.Popen(
> command_list,
> stdin=subprocess.PIPE,
> stdout=subprocess.PIPE,
> stderr=subprocess.STDOUT)
> stdout_data, _ = p.communicate()
> if p.returncode != 0:
> raise RuntimeError(
> 'Command %s failed: exit code: %s' % (
> command_list, p.returncode))
>
> def run(self):
> for command in CUSTOM_COMMANDS:
> self.RunCustomCommand(command)
>
> CUSTOM_COMMANDS = [
> ['apt-get', 'update'],
> ['apt-get', 'install', '-y', 'ffmpeg']]
> REQUIRED_PACKAGES = [
> 'boto3==1.11.17',
> 'ffmpeg-python==0.2.0',
> 'google-cloud-storage==1.31.0']
> setuptools.setup(
> name='DataflowJob',
> version='0.1',
> install_requires=REQUIRED_PACKAGES,
> packages=setuptools.find_packages(),
> mdclass={
> 'build': build,
> 'CustomCommands': CustomCommands})
>
> However, when I run the job I still get an error saying that ffmpeg is not
> installed: "No such file or directory: 'ffmpeg'"
>
> Any clue what am I doing wrong?
>
> Thanks so much!
>
>


Re: KafkaIO committing semantics

2020-09-10 Thread Luke Cwik
+Boyuan Zhang 

You can perform commit like side effects like this in two ways:
1) Output commits to a downstream PCollection
Read -> PCollection -> ... rest of pipeline ...
\-> PCollection -> Reshuffle -> ParDo(PerformCommitSideEffects)

This method is preferred if you can perform a commit from a different
worker and you're not bound to some inprocess state (e.g. JDBC connection)
since it is guaranteed to happen and isn't best effort. It also is using
the data path which is optimized to be as performant as possible.

2) Use the BundleFinalizer[1, 2] and register a callback after the bundle
is durably persisted. This is best effort and exists since there are some
APIs which have resident process state which can't be moved to another
worker so the callback always comes back to the same machine.

1: https://s.apache.org/beam-finalizing-bundles
2:
https://github.com/apache/beam/blob/1463ff08a4f782594dff64873d0cb70ca13d8f0d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1367


On Wed, Sep 9, 2020 at 8:24 AM Alexey Romanenko 
wrote:

> Sorry, I can’ say much about SDF. Maybe Lukasz Cwik can provide more
> details on this.
>
> On 8 Sep 2020, at 09:01, Gaurav Nakum  wrote:
>
> Thank you very much for your explanation!
> commitOffsetsInFinalize() -> although checkpointing depends on the runner
> is it not configurable in a connector implementation?
> Basically, I want to understand how this can be done with a new IO
> connector implementation, esp. with the new *SDF* API. If I am right, in
> the traditional UnboundedSource API, checkpointing was configured using
> *UnboundedSource.CheckpointMark*, but I am not sure about the SDF API.
> Also, since KafkaIO SDF read does not provide *commitOffsetsInFinalize* 
> functionality
> could you point to some resources which discuss checkpointing using the new
> SDF API?
>
> Thank you,
> Gaurav
> On 9/7/20 10:54 AM, Alexey Romanenko wrote:
>
> From my understanding:
> - ENABLE_AUTO_COMMIT_CONFIG will say to Kafka consumer (used inside
> KafkaIO to read messages) to commit periodically offsets in the background;
> - on the other hand, if "commitOffsetsInFinalize()” is used, then Beam
> Checkpoint mechanism will be leveraged to restart from checkpoints in case
> of failures. It won’t need to wait for pipeline's finish, though it’s up to
> the runner to decide when and how often to save checkpoints.
>
> In KafkaIO, it’s possible to use* only one* option for the same transform
> - either ENABLE_AUTO_COMMIT_CONFIG or commitOffsetsInFinalize()
>
>
>
> On 6 Sep 2020, at 07:24, Apple  wrote:
>
> Hi everyone,
>
>
> I have a question on KafkaIO.
> What is the difference between setting *AUTO_COMMIT_CONFIG* and
> *commitOffsetsInFinalize()*? My understanding is that:
>
> 1.*AUTO_COMMIT_CONFIG* commits Kafka records as soon as
> KafkaIO.read() outputs messages, but I am not sure how would this be
> helpful, for e.g. if a consumer transform after KafkaIO.read() fails , the
> messages would be lost (which sounds like at-most once semantics)
>
> 2.*commitOffsetsFinalize()*  commits when the pipeline is
> finished. But when does the pipeline end? In other words, when is
> PipelineResult.State = Done in a streaming scenario?
>
> Thanks!
>
>
>


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-26 Thread Luke Cwik
Splitting is part of the issue.

Other example issues are:
* "sources" that input data into the pipeline have no requirement to
produce records in a time ordered manner.
* timers can hold the output watermark and produce records out of order
with time.

All of this time ordering has a cost to performance and throughput so being
explicit that something needs time ordered input is useful.

On Mon, Aug 24, 2020 at 9:07 PM Dongwon Kim  wrote:

> Thanks Reuven for the input and Wang for CC'ing to Reuven.
>
> Generally you should not rely on PCollection being ordered
>
> Is it because Beam splits PCollection into multiple input splits and tries
> to process it as efficiently as possible without considering times?
> This one is very confusing as I've been using Flink for a long time;
> AFAIK, Flink DataStream API guarantees ordering for the same key between
> two different tasks.
>
> Best,
>
> Dongwon
>
> On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax  wrote:
>
>> Generally you should not rely on PCollection being ordered, though there
>> have been discussions about adding some time-ordering semantics.
>>
>>
>>
>> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>>
>>> Current Beam model does not guarantee an ordering after a GBK (i.e.
>>> Combine.perKey() in your). So you cannot expect that the C step sees
>>> elements in a specific order.
>>>
>>> As I recall on Dataflow runner, there is very limited ordering support.
>>> Hi +Reuven Lax  can share your insights about it?
>>>
>>>
>>> -Rui
>>>
>>>
>>>
>>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim 
>>> wrote:
>>>
 Hi,

 My Beam pipeline is designed to work with an unbounded source KafkaIO.
 It roughly looks like below:
 p.apply(KafkaIO.read() ...)   // (A-1)
   .apply(WithKeys.of(...).withKeyType(...))
   .apply(Window.into(FixedWindows.of(...)))
   .apply(Combine.perKey(...))  // (B)
   .apply(Window.into(new GlobalWindows())) // to have per-key stats
 in (C)
   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
 Note that (C) has its own state which is expected to be fetched and
 updated by window results (B) in order of event-time.

 Now I'm writing an integration test where (A-1) is replaced by (A-2):

> p.apply(TextIO.read().from("test.txt"))  // (A-2)

 "text.txt" contains samples having a single key.

 I get a wrong result and it turns out that window results didn't feed
 into (C) in order.
 Is it because (A-2) makes the pipeline a bounded one?

 Q1. How to prevent this from happening?
 Q2. How do you guys usually write an integration test for an unbounded
 one with stateful function?

 Best,

 Dongwon

>>>


Re: tensorflow-data-validation(with direct runner) failed to process large data because of grpc timeout on workers

2020-08-24 Thread Luke Cwik
Another person reported something similar for Dataflow and it seemed as
though in their scenario they were using locks and either got into a
deadlock or starved processing for long enough that the watchdog also
failed. Are you using locks and/or having really long single element
processing times?

On Mon, Aug 24, 2020 at 1:50 AM Junjian Xu  wrote:

> Hi,
>
> I’m running into a problem of tensorflow-data-validation with direct
> runner to generate statistics from some large datasets over 400GB.
>
> It seems that all workers stopped working after an error message of
> “Keepalive watchdog fired. Closing transport.” It seems to be a grpc
> keepalive timeout.
>
> ```
> E0804 17:49:07.419950276   44806 chttp2_transport.cc:2881]
> ipv6:[::1]:40823: Keepalive watchdog fired. Closing transport.
> 2020-08-04 17:49:07  local_job_service.py : INFO  Worker: severity: ERROR
> timestamp {   seconds: 1596563347   nanos: 420487403 } message: "Python sdk
> harness failed: \nTraceback (most recent call last):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\",
> line 158, in main\n
>  sdk_pipeline_options.view_as(ProfilingOptions))).run()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\",
> line 213, in run\nfor work_request in
> self._control_stub.Control(get_responses()):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 416, in __next__\nreturn self._next()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous:
> <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus =
> StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog
> timeout\"\n\tdebug_error_string =
> \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received
> from peer
> ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive
> watchdog timeout\",\"grpc_status\":14}\"\n>" trace: "Traceback (most recent
> call last):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\",
> line 158, in main\n
>  sdk_pipeline_options.view_as(ProfilingOptions))).run()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\",
> line 213, in run\nfor work_request in
> self._control_stub.Control(get_responses()):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 416, in __next__\nreturn self._next()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous:
> <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus =
> StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog
> timeout\"\n\tdebug_error_string =
> \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received
> from peer
> ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive
> watchdog timeout\",\"grpc_status\":14}\"\n>\n" log_location:
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:161"
> thread: "MainThread"
> Traceback (most recent call last):
>   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
> "__main__", mod_spec)
>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
> exec(code, run_globalse
>   File
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 248, in 
> main(sys.argv)
>   File
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 158, in main
> sdk_pipeline_options.view_as(ProfilingOptions))).run()
>   File
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 213, in run
> for work_request in self._control_stub.Control(get_responses()):
>   File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py",
> line 416, in __next__
> return self._next()
>   File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py",
> line 706, in _next
> raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.UNAVAILABLE
> details = "keepalive watchdog timeout"
> debug_error_string =
> "{"created":"@1596563347.420024732","description":"Error received from peer
> ipv6:[::1]:40823","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"keepalive
> watchdog timeout","grpc_status":14}"
> ```
>
> I originally raised the issue in tensorflow-data-validation community but
> we couldn't come up with any solution.
> https://github.com/tensorflow/data-validation/issues/133
>
> The beam version is 2.22.0. Please let me know if I 

Re: Resource Consumption increase With TupleTag

2020-08-21 Thread Luke Cwik
On Thu, Aug 20, 2020 at 12:54 PM Talat Uyarer 
wrote:

> Hi Lucas,
>
>> Not really. It is more about pipeline complexity, logging, debugging,
>> monitoring which become more complex.
>
> Should I use a different consumer group or should I use the same consumer
> group ?
>
I don't know what you're asking.


> And also How Autoscaling will decide worker count ?
>
> There is some analysis that is done based upon certain metrics to optimize
CPU utilization and throughput.



> What do you mean by it's not working properly?
>
> Actually i should correct my statement. Both jobs are using tuple tags but
> when I add one more branch after MessageExtractor things are changing.
>
How are they changing?


> What does the timing information for the transforms tell you on the
>> Dataflow Job UI?
>
> Based on Wall Time on DAG. KafkaIO is the slowest step on my pipeline. Its
> Walltime shows 28 days. I put all wall time for each step.
>
>
>   |--->Filter1 (1 day) --> WriteGCS(1day)
> KafkaIO(28 days)->MessageExtractor(7 hrs) -> |
>
>   |--->Filter2 (13 days) --> WriteGCS(2days)
>
> How do these wall times compare when they are run as two separate
pipelines.


> Thanks
>
> On Thu, Aug 20, 2020 at 10:58 AM Luke Cwik  wrote:
>
>> Do you mean I can put my simple pipeline multiple times for all topics in
>> one dataflow job ?
>> Yes
>>
>> Is there any side effect having multiple independent DAG on one DF job ?
>> Not really. It is more about pipeline complexity, logging, debugging,
>> monitoring which become more complex.
>>
>> And also why the TupleTag model is not working properly?
>> What do you mean by it's not working properly?
>>
>> Why is it using more resources than what it should be?
>> What does the timing information for the transforms tell you on the
>> Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
>> have to now write to two GCS locations instead of one for each work item
>> that you process so your doing more network calls)
>>
>>
>> On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Filter step is an independent step. We can think it is an etl step or
>>> something else. MessageExtractor step writes messages on TupleTags based on
>>> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
>>> already. MessageExtractor is processing 48kps but branches are processing
>>> their logs. Each Filter only consumes its log type. There is no any  So
>>> That's why I assume it should consume the same amount of workers. But it
>>> consumes more workers.
>>>
>>>
>>>
>>>  |--->Filter1(20kps)-->WriteGCS
>>> KafkaIO->MessageExtractor(48kps)-> |
>>>
>>>  |--->Filter2(28kps)-->WriteGCS
>>>
>>> Do you mean I can put my simple pipeline multiple times for all topics
>>> in one dataflow job ? Is there any side effect having multiple
>>> independent DAG on one DF job ? And also why the TupleTag model is not
>>> working properly? Why is it using more resources than what it should be?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> Just to clarify, previously you had.
>>>>
>>>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>>>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>>>
>>>> And now you have
>>>>
>>>>
>>>>   ---48kps--> Filter1
>>>> -> WriteGCS
>>>>   /
>>>> KafkaIO(topic1, topic2) + MessageExtractor
>>>>\
>>>>  ---48kps--> Filter2 ->
>>>> WriteGCS
>>>>
>>>> Each filter is now actually consuming (and throwing away) more data
>>>> than before.
>>>>
>>>> Or is MessageExtractor literally a multi-output DoFn already (which is
>>>> why you're talking about TupleTags). This could possibly be more
>>>> expensive if reading Kafak with headers is more expensive than reading
>>>> it without.
>>>>
>>>> If topic1 and topic2 are truly independent, I would keep their reads
>>>> separate. This will simplify your pipeline (and sounds like it'll
>>>> improve performa

Re: Resource Consumption increase With TupleTag

2020-08-20 Thread Luke Cwik
Do you mean I can put my simple pipeline multiple times for all topics in
one dataflow job ?
Yes

Is there any side effect having multiple independent DAG on one DF job ?
Not really. It is more about pipeline complexity, logging, debugging,
monitoring which become more complex.

And also why the TupleTag model is not working properly?
What do you mean by it's not working properly?

Why is it using more resources than what it should be?
What does the timing information for the transforms tell you on the
Dataflow Job UI? (Even if MessageExtractor seems simple it isn't free, You
have to now write to two GCS locations instead of one for each work item
that you process so your doing more network calls)


On Wed, Aug 19, 2020 at 8:36 PM Talat Uyarer 
wrote:

> Filter step is an independent step. We can think it is an etl step or
> something else. MessageExtractor step writes messages on TupleTags based on
> the kafka header. Yes, MessageExtractor is literally a multi-output DoFn
> already. MessageExtractor is processing 48kps but branches are processing
> their logs. Each Filter only consumes its log type. There is no any  So
> That's why I assume it should consume the same amount of workers. But it
> consumes more workers.
>
>
>
>  |--->Filter1(20kps)-->WriteGCS
> KafkaIO->MessageExtractor(48kps)-> |
>
>  |--->Filter2(28kps)-->WriteGCS
>
> Do you mean I can put my simple pipeline multiple times for all topics in
> one dataflow job ? Is there any side effect having multiple independent DAG
> on one DF job ? And also why the TupleTag model is not working properly?
> Why is it using more resources than what it should be?
>
> Thanks
>
>
>
> On Wed, Aug 19, 2020 at 5:16 PM Robert Bradshaw 
> wrote:
>
>> Just to clarify, previously you had.
>>
>> KafkaIO(topic1) --20kps--> Filter1 -> WriteGCS
>> KafkaIO(topic2) --28kps--> Filter2 -> WriteGCS
>>
>> And now you have
>>
>>
>>   ---48kps--> Filter1
>> -> WriteGCS
>>   /
>> KafkaIO(topic1, topic2) + MessageExtractor
>>\
>>  ---48kps--> Filter2 ->
>> WriteGCS
>>
>> Each filter is now actually consuming (and throwing away) more data than
>> before.
>>
>> Or is MessageExtractor literally a multi-output DoFn already (which is
>> why you're talking about TupleTags). This could possibly be more
>> expensive if reading Kafak with headers is more expensive than reading
>> it without.
>>
>> If topic1 and topic2 are truly independent, I would keep their reads
>> separate. This will simplify your pipeline (and sounds like it'll
>> improve performance). Note that you don't have to have a separate
>> Dataflow job for each read, you can have a single Pipeline and do as
>> many reads as you want and the'll all get executed in the same job.
>>
>> On Wed, Aug 19, 2020 at 4:14 PM Talat Uyarer
>>  wrote:
>> >
>> > Hi Robert,
>> >
>> > I calculated process speed based on worker count. When I have separate
>> jobs. topic1 job used 5 workers, topic2 job used 7 workers. Based on
>> KafkaIO message count. they had 4kps processing speed per worker. After I
>> combine them in one df job. That DF job started using ~18 workers, not 12
>> workers.
>> >
>> > How can I understand if they are poorly fused or not ? I can not write
>> Filter because it is a beamsql. I just want to simplified my DAG that's why
>> i did not mentioned
>> >
>> > Thanks
>> >
>> > On Wed, Aug 19, 2020 at 3:54 PM Robert Bradshaw 
>> wrote:
>> >>
>> >> Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which
>> >> would be 4kps total), or only 2kps coming out of KafkaIO and
>> >> MessageExtractor?
>> >>
>> >> Though it /shouldn't/ matter, due to sibling fusion, there's a chance
>> >> things are getting fused poorly and you could write Filter1 and
>> >> Filter2 instead as a DoFn with multiple outputs (see
>> >>
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_programming-2Dguide_-23additional-2Doutputs=DwIFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=Erfg03JLKLNG3lT2ejqq7_fbvfL95-wSxZ5hFKqzyKU=JsWPJxBXopYYenfBAp6nkwfB0Q1Dhs1d4Yi41fBY3a8=
>> ).
>> >>
>> >> - Robert
>> >>
>> >> On Wed, Aug 19, 2020 at 3:37 PM Talat Uyarer
>> >>  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have a very simple DAG on my dataflow job.
>> (KafkaIO->Filter->WriteGCS). When I submit this Dataflow job per topic it
>> has 4kps per instance processing speed. However I want to consume two
>> different topics in one DF job. I used TupleTag. I created TupleTags per
>> message type. Each topic has different message types and also needs
>> different filters. So my pipeline turned to below DAG. Message Extractor is
>> a very simple step checking header of kafka messages and writing the
>> correct TupleTag. However after starting to use this new DAG, dataflow
>> canprocess 2kps per 

Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-20 Thread Luke Cwik
+user 

On Thu, Aug 20, 2020 at 9:47 AM Luke Cwik  wrote:

> Are you using Dataflow runner v2[1]?
>
> If so, then you can use:
> --number_of_worker_harness_threads=X
>
> Do you know where/why the OOM is occurring?
>
> 1:
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
> 2:
> https://github.com/apache/beam/blob/017936f637b119f0b0c0279a226c9f92a2cf4f15/sdks/python/apache_beam/options/pipeline_options.py#L834
>
> On Thu, Aug 20, 2020 at 7:33 AM Kamil Wasilewski <
> kamil.wasilew...@polidea.com> wrote:
>
>> Hi all,
>>
>> As I stated in the title, is there an equivalent for
>> --numberOfWorkerHarnessThreads in Python SDK? I've got a streaming pipeline
>> in Python which suffers from OutOfMemory exceptions (I'm using Dataflow).
>> Switching to highmem workers solved the issue, but I wonder if I can set a
>> limit of threads that will be used in a single worker to decrease memory
>> usage.
>>
>> Regards,
>> Kamil
>>
>>


Re: Registering Protobuf schema

2020-08-19 Thread Luke Cwik
With providers there is also an ordering issue since multiple providers
could work for a given type so we want to apply them using some stable
ordering.

On Wed, Aug 19, 2020 at 10:08 AM Brian Hulette  wrote:

> Ah yes, the SchemaRegistry and SchemaProvider follow the same model, but
> none of the SchemaProviders are registered by default. Users can register
> the proto schema provider with
> registerSchemaProvider(Class) [1]:
>
>   p.getSchemaRegistry().registerSchemaProvider(ProtoMessageSchema.class);
>
> Then SchemaCoder should be used for all proto classes.
> We could use ServiceLoader to register all schema providers, then users
> wouldn't need to do this. I assume the reason we don't already is because
> schemas are still experimental and we want it to be opt-in.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/schemas/SchemaRegistry.html#registerSchemaProvider-org.apache.beam.sdk.schemas.SchemaProvider-
> <https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/schemas/SchemaRegistry.html#registerSchemaProvider-org.apache.beam.sdk.schemas.SchemaProvider->
>
> On Wed, Aug 19, 2020 at 8:44 AM Luke Cwik  wrote:
>
>> Brian, Coders have a provider model where the provider can be queried to
>> resolve for a given type and the providers are resolved in a specific
>> order. This gave the flexibility to handle situations like the one you
>> described.
>>
>> On Wed, Aug 19, 2020 at 12:30 AM 
>> wrote:
>>
>>> Hi Brian,
>>>
>>>
>>>
>>> Many thanks for your mail.
>>>
>>>
>>>
>>> Yes I figured that one out in the end from the docs, but many thanks for
>>> confirming.
>>>
>>>
>>>
>>> I did subsequently discover some other issues with protoBuf-derived
>>> schemas (essentially they don’t seem to be properly supported by
>>> BigQueryIO.Write or allow for optional fields) but I posted a separate
>>> message on the dev channel covering this.
>>>
>>>
>>>
>>> Kind regards,
>>>
>>>
>>>
>>> Rob
>>>
>>>
>>>
>>> *From:* Brian Hulette [mailto:bhule...@google.com]
>>> *Sent:* 18 August 2020 20:50
>>> *To:* user
>>> *Subject:* Re: Registering Protobuf schema
>>>
>>>
>>>
>>>
>>> *
>>> "This is an external email. Do you know who has sent it? Can you be sure
>>> that any links and attachments contained within it are safe? If in any
>>> doubt, use the Phishing Reporter Button in your Outlook client or forward
>>> the email as an attachment to ~ I've Been Phished"
>>> *
>>>
>>> Hi Robert,
>>> Sorry for the late reply on this. I think you should be able to do this
>>> by registering it in your pipeline's SchemaRegistry manually, like so:
>>>
>>>
>>>
>>>   Pipeline p;
>>>
>>>   p.getSchemaRegistry().registerSchemaProvider(Fx.class,
>>> ProtoMessageSchema.class);
>>>
>>> Of course this isn't quite as nice as just adding the DefualtSchema
>>> annotation to a class you control. Maybe we should consider some global
>>> config that would always use schemas for proto-generated classes.
>>>
>>>
>>> Brian
>>>
>>>
>>>
>>> On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
>>> wrote:
>>>
>>> This sounds like it is related to the problem I'm trying to solve. (In
>>> my case having a Java POJO containing a protobuf backed-class and trying to
>>> generate a Beam Schema from it.)
>>>
>>> I would be very interested to a solution to this as well :)
>>>
>>>
>>>
>>> On Tue, Jul 7, 2020 at 2:22 PM 
>>> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> I have a BEAM pipeline where I am reading data from some parquet files
>>> and converting them into a different format based on protobuf generated
>>> classes.
>>>
>>>
>>>
>>> I wish to associate a schema (derived from the protobuf classes) for my
>>> PCollections.  What is the appropriate way to do this with
>>> protobuf-generated classes?
>>>
>>>
>>>
>>> Code excerpt:
>>>
>>>
>>>
>>> PCollection result = input.apply("FXFilePattern", FileIO.*match*
>>> ().filepattern(fxDataFilePa

Re: Registering Protobuf schema

2020-08-19 Thread Luke Cwik
Brian, Coders have a provider model where the provider can be queried to
resolve for a given type and the providers are resolved in a specific
order. This gave the flexibility to handle situations like the one you
described.

On Wed, Aug 19, 2020 at 12:30 AM  wrote:

> Hi Brian,
>
>
>
> Many thanks for your mail.
>
>
>
> Yes I figured that one out in the end from the docs, but many thanks for
> confirming.
>
>
>
> I did subsequently discover some other issues with protoBuf-derived
> schemas (essentially they don’t seem to be properly supported by
> BigQueryIO.Write or allow for optional fields) but I posted a separate
> message on the dev channel covering this.
>
>
>
> Kind regards,
>
>
>
> Rob
>
>
>
> *From:* Brian Hulette [mailto:bhule...@google.com]
> *Sent:* 18 August 2020 20:50
> *To:* user
> *Subject:* Re: Registering Protobuf schema
>
>
>
>
> *
> "This is an external email. Do you know who has sent it? Can you be sure
> that any links and attachments contained within it are safe? If in any
> doubt, use the Phishing Reporter Button in your Outlook client or forward
> the email as an attachment to ~ I've Been Phished"
> *
>
> Hi Robert,
> Sorry for the late reply on this. I think you should be able to do this by
> registering it in your pipeline's SchemaRegistry manually, like so:
>
>
>
>   Pipeline p;
>
>   p.getSchemaRegistry().registerSchemaProvider(Fx.class,
> ProtoMessageSchema.class);
>
> Of course this isn't quite as nice as just adding the DefualtSchema
> annotation to a class you control. Maybe we should consider some global
> config that would always use schemas for proto-generated classes.
>
>
> Brian
>
>
>
> On Sun, Jul 12, 2020 at 9:56 AM Kaymak, Tobias 
> wrote:
>
> This sounds like it is related to the problem I'm trying to solve. (In my
> case having a Java POJO containing a protobuf backed-class and trying to
> generate a Beam Schema from it.)
>
> I would be very interested to a solution to this as well :)
>
>
>
> On Tue, Jul 7, 2020 at 2:22 PM  wrote:
>
> Hi All,
>
>
>
> I have a BEAM pipeline where I am reading data from some parquet files and
> converting them into a different format based on protobuf generated classes.
>
>
>
> I wish to associate a schema (derived from the protobuf classes) for my
> PCollections.  What is the appropriate way to do this with
> protobuf-generated classes?
>
>
>
> Code excerpt:
>
>
>
> PCollection result = input.apply("FXFilePattern", FileIO.*match*
> ().filepattern(fxDataFilePattern))
> .apply("FXReadMatches", FileIO.*readMatches*())
> .apply("FXReadParquetFile", ParquetIO.*readFiles*(fxAvroSchema))
> .apply("MapFXToProto", ParDo.*of*(MapFXToProto.*of*()));
> boolean hasSchema = result.hasSchema();  // returns false
>
>
>
> With thanks in advance.
>
>
>
> Kind regards,
>
>
>
> Rob
>
>
>
> *Robert Butcher*
>
> *Technical Architect | Foundry/SRS | NatWest Markets*
>
> WeWork, 10 Devonshire Square, London, EC2M 4AE
>
> Mobile +44 (0) 7414 730866 <+44%207414%20730866>
>
>
>
> This email is classified as *CONFIDENTIAL* unless otherwise stated.
>
>
>
>
>
> This communication and any attachments are confidential and intended
> solely for the addressee. If you are not the intended recipient please
> advise us immediately and delete it. Unless specifically stated in the
> message or otherwise indicated, you may not duplicate, redistribute or
> forward this message and any attachments are not intended for distribution
> to, or use by any person or entity in any jurisdiction or country where
> such distribution or use would be contrary to local law or regulation.
> NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts
> no responsibility for any changes made to this message after it was sent.
>
> Unless otherwise specifically indicated, the contents of this
> communication and its attachments are for information purposes only and
> should not be regarded as an offer or solicitation to buy or sell a product
> or service, confirmation of any transaction, a valuation, indicative price
> or an official statement. Trading desks may have a position or interest
> that is inconsistent with any views expressed in this message. In
> evaluating the information contained in this message, you should know that
> it could have been previously provided to other clients and/or internal
> NatWest Markets personnel, who could have already acted on it.
>
> NatWest Markets cannot provide absolute assurances that all electronic
> communications (sent or received) are secure, error free, not corrupted,
> incomplete or virus free and/or that they will not be lost, mis-delivered,
> destroyed, delayed or intercepted/decrypted by others. Therefore NatWest
> Markets disclaims all liability with regards to electronic communications
> (and the contents therein) if they are corrupted, lost destroyed, delayed,
> incomplete, mis-delivered, intercepted, decrypted 

Re: Exceptions: Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.

2020-08-04 Thread Luke Cwik
BEAM-6855 is still open and I updated it linking to this thread that a user
is still being impacted.

On Tue, Aug 4, 2020 at 10:20 AM Mohil Khare  wrote:

> yeah .. looks like a bug still exists.
>
> So how does this work ? Shall I open a new Jira ?
>
> Thanks and regards
> Mohil
>
> On Thu, Jul 30, 2020 at 10:39 PM Reuven Lax  wrote:
>
>> I believe that the person trying to fix BEAM-6855 was unable to reproduce
>> it in test, and therefore assumed that the bug was fixed. However it
>> appears that the bug still exists.
>>
>> On Wed, Jul 29, 2020 at 10:36 AM Kenneth Knowles  wrote:
>>
>>> Hi Mohil,
>>>
>>> It helps also to tell us what version of Beam you are using and some
>>> more details. This looks related to
>>> https://issues.apache.org/jira/browse/BEAM-6855 which claims to be
>>> resolved in 2.17.0
>>>
>>> Kenn
>>>
>>> On Mon, Jul 27, 2020 at 11:47 PM Mohil Khare  wrote:
>>>
 Hello all,

 I think I found the reason for the issue.  Since the exception was
 thrown by StreamingSideInputDoFnRunner.java, I realize that I recently
 added side input to one of my ParDo that does stateful transformations.
 It looks like there is some issue when you add side input (My side
 input was coming via Global window to ParDo in a Fixed Window) to stateful
 DoFn.

 As a work around, instead of adding side input to stateful ParDo, I
 introduced another ParDo  that enriches streaming data with side input
 before flowing into stateful DoFn. That seems to have fixed the problem.


 Thanks and regards
 Mohil



 On Mon, Jul 27, 2020 at 10:50 AM Mohil Khare  wrote:

> Hello All,
>
> Any idea how to debug this and find out which stage, which DoFn or
> which side input is causing the problem?
> Do I need to override OnTimer with every DoFn to avoid this problem?
> I thought that some uncaught exceptions were causing this and added
> various checks and exception handling in all DoFn and still seeing this
> issue.
> It has been driving me nuts. And now forget DRAIN, it happens during
> normal functioning as well. Any help would be appreciated.
>
> java.lang.UnsupportedOperationException: Attempt to deliver a timer to
> a DoFn, but timers are not supported in Dataflow.
>
>1.
>   1. at org.apache.beam.runners.dataflow.worker.
>   StreamingSideInputDoFnRunner.onTimer (
>   StreamingSideInputDoFnRunner.java:86
>   
> 
>   )
>   2. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processUserTimer (SimpleParDoFn.java:360
>   
> 
>   )
>   3. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.access$600 (SimpleParDoFn.java:73
>   
> 
>   )
>   4. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn$TimerType$1.processTimer (SimpleParDoFn.java:444
>   
> 
>   )
>   5. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:473
>   
> 
>   )
>   6. at org.apache.beam.runners.dataflow.worker.
>   SimpleParDoFn.processTimers (SimpleParDoFn.java:353
>   
> 
>   )
>   7. at
>   org.apache.beam.runners.dataflow.worker.util.common.worker.
>   ParDoOperation.finish (ParDoOperation.java:52
>   
> 
>   )
>   8. at
>   org.apache.beam.runners.dataflow.worker.util.common.worker.
> 

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

2020-07-17 Thread Luke Cwik
+dev 

On Fri, Jul 17, 2020 at 8:03 PM Luke Cwik  wrote:

> +Heejong Lee  +Chamikara Jayalath
> 
>
> Do you know if your trial record has an empty key or value?
> If so, then you hit a bug and it seems as though there was a miss
> supporting this usecase.
>
> Heejong and Cham,
> It looks like the Javadoc for ByteArrayDeserializer and other
> Deserializers can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
> non XLang KafkaIO does this correctly in its regular coder inference
> logic[4]. I flied BEAM-10529[5]
>
> 1:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
> 2:
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
> 3:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
> 4:
> https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
> 5: https://issues.apache.org/jira/browse/BEAM-10529
>
>
> On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to build a streaming beam pipeline in python which should
>> capture messages from kafka and then execute further stages of data
>> fetching from other sources and aggregation. The step-by-step process of
>> what I have built till now is:
>>
>>1.
>>
>>Running Kafka instance on localhost:9092
>>
>>./bin/kafka-server-start.sh ./config/server.properties
>>2.
>>
>>Run beam-flink job server using docker
>>
>>docker run --net=host apache/beam_flink1.10_job_server:latest
>>3.
>>
>>Run beam-kafka pipeline
>>
>> import apache_beam as beamfrom apache_beam.io.external.kafka import 
>> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import 
>> PipelineOptions, StandardOptions
>>
>> if __name__ == '__main__':
>> options = PipelineOptions([
>> "--job_endpoint=localhost:8099",
>> "--environment_type=LOOPBACK",
>> "--streaming",
>> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
>> ])
>>
>> options = options.view_as(StandardOptions)
>> options.streaming = True
>>
>> pipeline = beam.Pipeline(options=options)
>>
>> result = (
>> pipeline
>>
>> | "Read from kafka" >> ReadFromKafka(
>> consumer_config={
>> "bootstrap.servers": 'localhost:9092',
>> },
>> topics=['mytopic'],
>> expansion_service='localhost:8097',
>> )
>>
>> | beam.Map(print)
>> )
>>
>> pipeline.run()
>>
>>
>>1. Publish new message using kafka-producer.sh
>>
>> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
>> mytopic>tryme
>>
>> After publishing this trial message, the beam pipeline perceives the
>> message but crashes giving this error:
>>
>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
>> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
>> at 
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
>> at 
>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
>> at 
>> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>>  Source)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
>> at 
>> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
>> at org.apache.beam
>>
>> Regards,
>>
>> Ayush Sharma.
>>
>>


Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

2020-07-17 Thread Luke Cwik
+Heejong Lee  +Chamikara Jayalath 


Do you know if your trial record has an empty key or value?
If so, then you hit a bug and it seems as though there was a miss
supporting this usecase.

Heejong and Cham,
It looks like the Javadoc for ByteArrayDeserializer and other Deserializers
can return null[1, 2] and we aren't using
NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that the
non XLang KafkaIO does this correctly in its regular coder inference
logic[4]. I flied BEAM-10529[5]

1:
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-
2:
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-
3:
https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478
4:
https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85
5: https://issues.apache.org/jira/browse/BEAM-10529


On Fri, Jul 17, 2020 at 8:51 AM ayush sharma <1705ay...@gmail.com> wrote:

> Hi,
>
> I am trying to build a streaming beam pipeline in python which should
> capture messages from kafka and then execute further stages of data
> fetching from other sources and aggregation. The step-by-step process of
> what I have built till now is:
>
>1.
>
>Running Kafka instance on localhost:9092
>
>./bin/kafka-server-start.sh ./config/server.properties
>2.
>
>Run beam-flink job server using docker
>
>docker run --net=host apache/beam_flink1.10_job_server:latest
>3.
>
>Run beam-kafka pipeline
>
> import apache_beam as beamfrom apache_beam.io.external.kafka import 
> ReadFromKafka, WriteToKafkafrom apache_beam.options.pipeline_options import 
> PipelineOptions, StandardOptions
>
> if __name__ == '__main__':
> options = PipelineOptions([
> "--job_endpoint=localhost:8099",
> "--environment_type=LOOPBACK",
> "--streaming",
> "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
> ])
>
> options = options.view_as(StandardOptions)
> options.streaming = True
>
> pipeline = beam.Pipeline(options=options)
>
> result = (
> pipeline
>
> | "Read from kafka" >> ReadFromKafka(
> consumer_config={
> "bootstrap.servers": 'localhost:9092',
> },
> topics=['mytopic'],
> expansion_service='localhost:8097',
> )
>
> | beam.Map(print)
> )
>
> pipeline.run()
>
>
>1. Publish new message using kafka-producer.sh
>
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
> mytopic>tryme
>
> After publishing this trial message, the beam pipeline perceives the
> message but crashes giving this error:
>
> RuntimeError: org.apache.beam.sdk.util.UserCodeException: 
> org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
> at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
> at 
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
> at 
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
> at 
> org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
> at org.apache.beam
>
> Regards,
>
> Ayush Sharma.
>
>


Re: Connecting To MSSQL Server With Apache Beam, Python SDK

2020-07-17 Thread Luke Cwik
There is typically no trivial way to have SQL queries be partitioned to
parallelize the reading automatically since you typically can't have
multiple "readers" connect and use the same query results. There are a
couple of options that you could use:
1) Use a single query use the databases abilitity to dump the contents to a
file and then perform a parallel read over the files contents
2) Paritition your query over a primary key and submit N queries where N is
the amount of parallel reads you want (alternatively dump to a temp table
and partition your query)
3) Write your own splittable DoFn that can partition a query arbitrarily

Do you see some meaningful errors when running your pipeline on Dataflow in
the logs? If its an issue that pyodbc isn't installed on the remote worker
you should check out how to manage pipeline dependencies[1].

3 has the best parallelism potential followed by 1 while 2 is the easiest
to get working followed by 1 (assuming that the dump file format is already
supported by Beam). On the writing side, you can use GroupIntoBatches[2] to
have meaningfully large transactions or try to use the JdbcIO write
transform when it becomes available in a cross language way.

There is ongoing work[3, 4, 5] to use the existing JdbcIO in Beam Java
connector as a cross language transform available to Python.

I don't know of another way to get on the mailing list then sending an
e-mail to user-subscr...@beam.apache.org

1: https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
2:
https://github.com/apache/beam/blob/62118fa66788ad45032a60abc30799cd6f0e4658/sdks/python/apache_beam/transforms/util.py#L744
3: https://github.com/apache/beam/pull/12145
4: https://issues.apache.org/jira/browse/BEAM-10135
5: https://issues.apache.org/jira/browse/BEAM-10136

On Fri, Jul 17, 2020 at 9:21 AM Dennis  wrote:

> Hello,
>
>
>
> I'm writing in order to inquire about developing a pipeline (using the
> Python SDK) with multiple PTransforms that can read from, write to, and
> alter data from an MSSQL server.
>
>
>
> I've been using beam-nuggets (https://pypi.org/project/beam-nuggets/), a
> community I/O Connector for dealing with these kinds of PTransforms for a
> MySQL server, and was looking to see if there's an option to do this for
> MSSQL.
>
>
>
> So far, I've been able to run a pipeline with DirectRunner that reads data
> from MSSQL using pyodbc. While this is a good starting point, it's not
> running with DataflowRunner (even after configuring Private IP), and it's
> not parallelized.
>
>
>
> I tried to look into SQLAlchemy, but it seems that there isn't as much
> support as there is for MySQL, especially for the insertion method. It is
> expected that the default insertion method is upsert. For MySQL, this was
> implemented using:
>
>
>
> from sqlalchemy.dialects.mysql import insert as mysql_insert
>
>
>
> There is not such a package available for MSSQL...
>
>
>
> How would one go about doing this? I've looked at several stack overflow
> articles, but there wasn't any solution there that had any similar
> functionality to that of beam-nuggets. Perhaps I missed a solution?
>
>
>
> I realize that this is a loaded question, so I greatly appreciate any help
> in advance.
>
>
>
> Thanks,
>
> Dennis
>
>
>
> P.S. I had trouble adding my work email address, dzvigel...@questrade.com
> to the mailing list (even though I went through the same steps to subscribe
> as with this one), could you please add it? Thanks.
>


Re: Terminating a streaming integration test

2020-07-16 Thread Luke Cwik
For unit testing, I would recommend using TestStream + PAssert where
TestStream replaces the Pubsub source. You can see some examples in
LeaderBoardTest[1]

For integration testing, I would recommend using a metric and cancelling
the pipeline from another thread once the condition is met. There is some
code in the TestDataflowRunner[2] that does this (this code checks for
PAssert success/failure metrics but you can look for any metric that you
want).

1:
https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
2:
https://github.com/apache/beam/blob/de1c14777d3c6a1231361db12f3a0b9fd3b84b3e/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java#L145

On Thu, Jul 16, 2020 at 11:16 AM Niels Basjes  wrote:

> What is the cleanest way to detect the pipeline can be cancelled?
> If the pipeline runs as intended I think I should stop on a "stop"
> message. And in case of problems I should stop on a timeout.
>
> Do you know of an example that does this?
>
> Niels
>
> On Thu, 16 Jul 2020, 18:34 Luke Cwik,  wrote:
>
>> Have you tried cancelling[1] the pipeline once your condition is met?
>>
>> 1:
>> https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46
>>
>> On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Hi Niels,
>>>
>>> AFAICT, for that reason in KafkaIOIT [1] we use
>>> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
>>> cardinality of input dataset. It’s not 100% faire imo, since in this case
>>> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
>>> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>>>
>>> Also, I believe that “p.run()” should be asynchronous by default [2] and
>>> I guess it’s blocked only for DirectRunner (perhaps in a sense of testing
>>> reasons).
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
>>> [2]
>>> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>>>
>>> On 16 Jul 2020, at 15:02, Niels Basjes  wrote:
>>>
>>> Hi,
>>>
>>> I found a way that seems to work.
>>>
>>> I have
>>>
>>> @Rule
>>>
>>> public final transient TestPipeline pipeline = TestPipeline.create();
>>>
>>> in the test I configure PubSub to connect to the emulator
>>>
>>> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>>>
>>> options.setProject(PROJECT_NAME);
>>>
>>> options.setPubsubRootUrl(getPubsubRootUrl());
>>>
>>> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>>>
>>> options.setStreaming(true);
>>>
>>> I then hook my processing to this pipeline and afterwards I do this:
>>>
>>> *pipeline
>>> *.getOptions().as(DirectOptions.class)
>>> .setBlockOnRun(false);
>>>
>>> PipelineResult job = pipeline.run();
>>>
>>> long waitTime = 5000;
>>>
>>> LOG.info("Waiting ... {} seconds", waitTime/1000);
>>>
>>> job.waitUntilFinish(Duration.millis(waitTime));
>>>
>>>
>>> Although this works it will fail my build on a slow machine.
>>>
>>> Is this the best option? Or can I inject a "stop" message in my stream and 
>>> let the pipeline shutdown once it sees that?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Can you please indicate if this is a valid way of doing this?
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes  wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm testing to see how I can build an integration test for a Beam
>>>> application that uses PubSub.
>>>>
>>>> So I start the Google provided PubSub emulator, create things like
>>>> topic and subscription, put in some validation messages and then run the
>>>> job against that and verify the data that comes out.
>>>> I'm logging the events to the screen and there I see the data coming in
>>>> and being processed.
>>>>
>>>> The problem I have is that I have not been able to figure out how to
>>>> cleanly terminate this stream after it has processed all my messages.
>>>>
>>>> I have also inserted some 'stop' messages to enable triggering a "we're
>>>> done, you can stop now".
>>>>
>>>> I've been digging through documentation and the apis and found nothing
>>>> that works.
>>>>
>>>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000
>>>> ))
>>>>
>>>> I have tried setting the timestamp of those to MAX_LONG and
>>>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>>>
>>>> So far I have not been able to figure out how to tell the TestPipeline:
>>>> Finish what you have and shutdown.
>>>>
>>>> How do I do that?
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>>
>>>


Re: Terminating a streaming integration test

2020-07-16 Thread Luke Cwik
Have you tried cancelling[1] the pipeline once your condition is met?

1:
https://github.com/apache/beam/blob/d957ce480604be623b0df77e62948c3e0d2c2453/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L46

On Thu, Jul 16, 2020 at 7:22 AM Alexey Romanenko 
wrote:

> Hi Niels,
>
> AFAICT, for that reason in KafkaIOIT [1] we use
> “.withMaxNumRecords(numRecords)” where “numRecords” is actually a
> cardinality of input dataset. It’s not 100% faire imo, since in this case
> UnboundedRead becomes BoundedReadFromUnboundedSource, and we still use
> “waitUntilFinish(timeout)". Though, I’m not sure PubSubIO supports this.
>
> Also, I believe that “p.run()” should be asynchronous by default [2] and I
> guess it’s blocked only for DirectRunner (perhaps in a sense of testing
> reasons).
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
> [2]
> https://beam.apache.org/documentation/pipelines/create-your-pipeline/#running-your-pipeline
>
> On 16 Jul 2020, at 15:02, Niels Basjes  wrote:
>
> Hi,
>
> I found a way that seems to work.
>
> I have
>
> @Rule
>
> public final transient TestPipeline pipeline = TestPipeline.create();
>
> in the test I configure PubSub to connect to the emulator
>
> PubsubOptions options = pipeline.getOptions().as(PubsubOptions.class);
>
> options.setProject(PROJECT_NAME);
>
> options.setPubsubRootUrl(getPubsubRootUrl());
>
> options.setCredentialFactoryClass(EmulatorCredentialsFactory.class);
>
> options.setStreaming(true);
>
> I then hook my processing to this pipeline and afterwards I do this:
>
> *pipeline
> *.getOptions().as(DirectOptions.class)
> .setBlockOnRun(false);
>
> PipelineResult job = pipeline.run();
>
> long waitTime = 5000;
>
> LOG.info("Waiting ... {} seconds", waitTime/1000);
>
> job.waitUntilFinish(Duration.millis(waitTime));
>
>
> Although this works it will fail my build on a slow machine.
>
> Is this the best option? Or can I inject a "stop" message in my stream and 
> let the pipeline shutdown once it sees that?
>
>
>
>
>
>
>
> Can you please indicate if this is a valid way of doing this?
> Thanks.
>
>
>
>
>
> On Wed, Jul 15, 2020 at 8:48 PM Niels Basjes  wrote:
>
>> Hi,
>>
>> I'm testing to see how I can build an integration test for a Beam
>> application that uses PubSub.
>>
>> So I start the Google provided PubSub emulator, create things like topic
>> and subscription, put in some validation messages and then run the job
>> against that and verify the data that comes out.
>> I'm logging the events to the screen and there I see the data coming in
>> and being processed.
>>
>> The problem I have is that I have not been able to figure out how to
>> cleanly terminate this stream after it has processed all my messages.
>>
>> I have also inserted some 'stop' messages to enable triggering a "we're
>> done, you can stop now".
>>
>> I've been digging through documentation and the apis and found nothing
>> that works.
>>
>> This does not work: pipeline.run().waitUntilFinish(Duration.millis(2000))
>>
>> I have tried setting the timestamp of those to MAX_LONG and
>> TIMESTAMP_MAX_VALUE but that yielded exceptions.
>>
>> So far I have not been able to figure out how to tell the TestPipeline:
>> Finish what you have and shutdown.
>>
>> How do I do that?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>
>


Re: Beam supports Flink Async IO operator

2020-07-15 Thread Luke Cwik
That is correct.

On Mon, Jul 13, 2020 at 4:33 PM Eleanore Jin  wrote:

> Hi Kaymak,
>
> Sorry for the late reply and thanks for sharing the blog, I went through
> it.
>
> here is my understanding:
>
> timely processing could `buffer` data and send them to the external
> system in a batch fashion, but in order for it to work `similar` flink
> async IO operator it also requires the external system to be able to accept
> input data in bulk and return back the response synchronously. Otherwise it
> would still like making multiple sync calls to the external system and get
> back responses one by one.
>
> Thanks a lot for sharing!
>
> Best,
> Eleanore
>
> On Thu, Jul 9, 2020 at 1:56 AM Kaymak, Tobias 
> wrote:
>
>> Hi Eleanore,
>>
>> Maybe batched RPC is what you are looking for?
>> https://beam.apache.org/blog/timely-processing/
>>
>> On Wed, Jul 8, 2020 at 6:20 PM Eleanore Jin 
>> wrote:
>>
>>> Thanks Luke and Max for the information.
>>>
>>> We have the use case that inside a DoFn, we will need to call external
>>> services to trigger some other flows. The calls to other services are REST
>>> based sync calls, and it will take 150 milliseconds plus to return. We are
>>> using Flink as the runner and I came across this Async I/O operator from
>>> flink, trying to figure out if this is the right approach and if Beam
>>> provides any similar concept for it.
>>>
>>> Thanks!
>>> Eleanore
>>>
>>> On Wed, Jul 8, 2020 at 2:55 AM Maximilian Michels 
>>> wrote:
>>>
>>>> Just to clarify: We could make the AsnycIO operator also available in
>>>> Beam but the operator has to be represented by a concept in Beam.
>>>> Otherwise, there is no way to know when to produce it as part of the
>>>> translation.
>>>>
>>>> On 08.07.20 11:53, Maximilian Michels wrote:
>>>> > Flink's AsycIO operator is useful for processing io-bound operations,
>>>> > e.g. sending network requests. Like Luke mentioned, it is not
>>>> available
>>>> > in Beam.
>>>> >
>>>> > -Max
>>>> >
>>>> > On 07.07.20 22:11, Luke Cwik wrote:
>>>> >> Beam is a layer that sits on top of execution engines like Flink and
>>>> >> provides its own programming model thus native operators like
>>>> Flink's
>>>> >> async IO operator are not exposed.
>>>> >>
>>>> >> Most people use a DoFn to do all their IO and sometimes will compose
>>>> >> it with another transform such as GroupIntoBatches[1] to simplify
>>>> >> their implementation.
>>>> >>
>>>> >> Why do you need async?
>>>> >>
>>>> >> 1:
>>>> >>
>>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Tue, Jul 7, 2020 at 11:03 AM Eleanore Jin >>> >> <mailto:eleanore@gmail.com>> wrote:
>>>> >>
>>>> >> Hi community,
>>>> >>
>>>> >> I cannot find any documentation for Beam supporting Flink async
>>>> IO
>>>> >> operator
>>>> >>
>>>> >> (
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
>>>>
>>>> >>
>>>> >> just wonder is this not supported right now?
>>>> >>
>>>> >> Thanks a lot!
>>>> >> Eleanore
>>>> >>
>>>>
>>>


Re: KinesisIO checkpointing

2020-07-09 Thread Luke Cwik
The BoundedReadFromUnboundedReader does checkpoint the underlying
UnboundedSource, is that checkpoint logic not working?
Do you have KinesisIO configured to always read from a specific point?

On Thu, Jul 9, 2020 at 9:56 AM Sunny, Mani Kolbe  wrote:

> We did the same and started using maxReadTime and put the application to
> run on a recurring schedule of 5 minutes. It works fine end to end without
> any error.
>
>
>
> But the problem is that it always starts reading from the beginning of the
> Kinesis stream when it stop-starts.
>
>
>
> When I did some investigation on that, I found that when you set
> maxReadTime, it will run using BoundedReadFromUnboundedSource mode. That
> essentially converts source in to a bounded one. This means checkpointing
> or watermark no longer supported. Reader just reads for x number of time
> and exists.
>
>
>
> Is there anyway recommended way to resume reading from the position it
> finished? Either using maxReadTime or in unboundedSource mode?
>
>
>
> Could some point me to a sample pipeline code that uses Kinesis as source?
>
>
>
> Regards,
>
> Mani
>
>
>
> *From:* Lars Almgren Schwartz 
> *Sent:* Thursday, June 25, 2020 7:53 AM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> We had the exact same problem, but have not spent any time trying to solve
> it, we just skipped checkpointing for now.
>
> When we saw this problem we ran Spark 2.4.5 (local mode) and Kinesis 2.18
> and 2.19.
>
>
>
> On Wed, Jun 24, 2020 at 6:22 PM Sunny, Mani Kolbe  wrote:
>
> We are on spark 2.4 and Beam 2.22.0
>
>
>
> *From:* Alexey Romanenko 
> *Sent:* Wednesday, June 24, 2020 5:15 PM
> *To:* user@beam.apache.org
> *Subject:* Re: KinesisIO checkpointing
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Yes, KinesisIO supports restart from checkpoints and it’s based on runner
> checkpoints support [1].
>
>
>
> Could you specify which version of Spark and Beam you use?
>
>
>
> [1]
> https://stackoverflow.com/questions/62259364/how-apache-beam-manage-kinesis-checkpointing/62349838#62349838
> 
>
>
>
> On 24 Jun 2020, at 14:13, Sunny, Mani Kolbe  wrote:
>
>
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is working fine, but I have noticed that whenever we restarts
> streaming, application is starting to read from Kinesis TRIM_HORIZON. That
> is, it is not resuming from last checkpoint position. Then I found that the
> checkpoint directory is based on --jobName and --checkpointDir properties.
> So I tried running as below:
>
>
>
> *spark-submit --master yarn --deploy-mode cluster --conf
> spark.dynamicAllocation.enabled=false \*
>
> *--driver-memory 1g --executor-memory 1g --num-executors 1
> --executor-cores 1 \*
>
> *--class com.dnb.optimus.prime.processor.PrimeStreamProcessor \*
>
> *--conf spark.executor.extraClassPath=/etc/hbase/conf \*
>
> */tmp/stream-processor-0.0.0.8-spark.jar \*
>
> *--runner=SparkRunner \*
>
> *--jobName=PrimeStreamProcessor \*
>
> *--checkpointDir=hdfs:///tmp/PrimeStreamProcessor checkpoint \*
>
> *--useWindow=true \*
>
> *--windowDuration=60s --windowLateness=0s --windowElementCount=1 \*
>
> *--maxReadTime=-1 \*
>
> *--streaming=true*
>
>
>
> I can see that it is able to fetch checkpoint data from *checkpointDir* path
> provided. But When the driver tries to broadcast this information to
> executors, it is failing with below exception.
>
>
>
>
>
>
>
>
> *20/06/12 15:35:28 ERROR yarn.Client: Application diagnostics message:
> User class threw exception:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Accumulator must be registered
> before send to executor at
> org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:71)
> at
> org.apache.beam.runners.spark.SparkPipelineResult.access$000(SparkPipelineResult.java:44)
>  at
> 

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Luke Cwik
lWriteEntries(HashMap.java:1790)
> 23:11:11.342 [ERROR] [system.err] at
> java.util.HashMap.writeObject(HashMap.java:1363)
> 23:11:11.342 [ERROR] [system.err] at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 23:11:11.342 [ERROR] [system.err] at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 23:11:11.342 [ERROR] [system.err] at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 23:11:11.342 [ERROR] [system.err] at
> java.lang.reflect.Method.invoke(Method.java:498)
> 23:11:11.342 [ERROR] [system.err] at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 23:11:11.343 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 23:11:11.344 [ERROR] [system.err] at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> 23:11:11.344 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> 23:11:11.344 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> 23:11:11.344 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> 23:11:11.344 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 23:11:11.344 [ERROR] [system.err] at
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
> 23:11:11.344 [ERROR] [system.err] ... 22 more
> 23:11:11.393 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter]
> 23:11:11.393 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] FAILURE: Build
> failed with an exception.
>
> On Wed, 8 Jul 2020 at 22:58, Luke Cwik  wrote:
>
>> Can you provide the full stacktrace?
>>
>> On Wed, Jul 8, 2020 at 12:33 PM Rui Wang  wrote:
>>
>>> Tried some code search in Beam repo but I didn't find the exact line
>>> of code that throws your exception.
>>>
>>> However, I believe for Java Classes you used in primitives (ParDo,
>>> CombineFn) and coders, it's very likely you need to make them
>>> serializable (i.e. implements Serializable).
>>>
>>>
>>> -Rui
>>>
>>> On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich 
>>> wrote:
>>> >
>>> > Hi!
>>> > I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and
>>> annotated it with DefaultCoder
>>> >
>>> > @DefaultCoder(AvroCoder.class)
>>> > public class ProductCatalog {
>>> >
>>> > When I trying to submit it to cluster I get an error:
>>> >
>>> > Caused by: java.io.NotSerializableException: ...common.ProductCatalog
>>> >
>>> > If I add `implements Serializable` to the class definition everything
>>> works fine. In the Apache Beam guide, I don't see anything about using
>>> implements Serializable. What I'm doing wrong? Thank you in advance for
>>> your help
>>>
>>
>
> --
> Best Regards,
> Kirill
>


Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Luke Cwik
Can you provide the full stacktrace?

On Wed, Jul 8, 2020 at 12:33 PM Rui Wang  wrote:

> Tried some code search in Beam repo but I didn't find the exact line
> of code that throws your exception.
>
> However, I believe for Java Classes you used in primitives (ParDo,
> CombineFn) and coders, it's very likely you need to make them
> serializable (i.e. implements Serializable).
>
>
> -Rui
>
> On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich 
> wrote:
> >
> > Hi!
> > I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and
> annotated it with DefaultCoder
> >
> > @DefaultCoder(AvroCoder.class)
> > public class ProductCatalog {
> >
> > When I trying to submit it to cluster I get an error:
> >
> > Caused by: java.io.NotSerializableException: ...common.ProductCatalog
> >
> > If I add `implements Serializable` to the class definition everything
> works fine. In the Apache Beam guide, I don't see anything about using
> implements Serializable. What I'm doing wrong? Thank you in advance for
> your help
>


Re: Recommended way to generate a TableRow from Json without using TableRowJsonCoder context OUTER?

2020-07-08 Thread Luke Cwik
The deprecated method is not going to be removed anytime soon so I wouldn't
worry about it being removed.

If you really want to use non-deprecated methods, then the
TableRowJsonCoder uses the StringUtf8Coder to parse strings so it is
looking for a nested encoding using the StringUtf8Coder encoding. So
something like this:
ByteArrayOutputStream baos = new ...
StringUtf8Coder.of().encode(jsonString, baos);
TableRow row = TableRowJsonCoder.of().decode(new
ByteArrayInputStream(baos.toByteArray()));

But why use a coder at all? TableRowJsonCoder is a thin wrapper around
using Jackson's ObjectMapper to perform the conversion. So you could do
something like:
ObjectMapper mapper = new
ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
TableRow row = mapper.readValue(strValue, TableRow.class);


On Wed, Jul 8, 2020 at 7:57 AM Lars Almgren Schwartz 
wrote:

> Hey,
>
> Don't know if it's the official way but we have written our own proto to
> BigQuery converter which works pretty well.
>
> public static TableRow convertEventToTableRow(TableRow tableRow, Message 
> event) {
> Map fields = event.getAllFields();
> for (Descriptors.FieldDescriptor field : fields.keySet()) {
> tableRow = mapToBigQueryField(tableRow, field, fields.get(field));
> }
>
> return tableRow;
> }
>
> private static TableRow mapToBigQueryField(
> TableRow tableRow, Descriptors.FieldDescriptor field, Object value) {
> Descriptors.FieldDescriptor.JavaType fieldType = field.getJavaType();
> switch (fieldType) {
> case INT:
> case LONG:
> case FLOAT:
> case DOUBLE:
> case BOOLEAN:
> return tableRow.set(field.getName(), value);
> case BYTE_STRING:
> if (field.isRepeated()) {
> return tableRow.set(
> field.getName(),
> processRepeatedField(
> value,
> x ->
> Base64.getEncoder()
> .encodeToString(
> ((ByteString) 
> x).toByteArray(;
> } else {
> return tableRow.set(
> field.getName(),
> Base64.getEncoder().encodeToString(((ByteString) 
> value).toByteArray()));
> }
> case ENUM:
> if (field.isRepeated()) {
> return tableRow.set(
> field.getName(), processRepeatedField(value, x -> 
> x.toString()));
> } else {
> return tableRow.set(field.getName(), value.toString());
> }
> case STRING:
> if (isUUIDField(field.getName())) {
> if (field.isRepeated()) {
> return tableRow.set(
> field.getName(),
> processRepeatedField(
> value, x -> 
> UUIDUtil.getBase64FromUUID((String) x)));
> } else {
> return tableRow.set(
> field.getName(), 
> UUIDUtil.getBase64FromUUID((String) value));
> }
> } else {
> return tableRow.set(field.getName(), value);
> }
> case MESSAGE:
> switch (field.getMessageType().getFullName()) {
> // Map well known message types that we have a specific 
> mapping for.
> case "google.protobuf.Timestamp":
> if (field.isRepeated()) {
> return tableRow.set(
> field.getName(),
> processRepeatedField(
> value,
> x ->
> 
> com.google.cloud.Timestamp.fromProto(
> (Timestamp) x)
> .toString()));
> } else {
> return tableRow.set(
> field.getName(),
> 
> com.google.cloud.Timestamp.fromProto((Timestamp) value)
> .toString());
> }
> case "xxx.xxx.ExactNumber":
> if (field.isRepeated()) {
> return tableRow.set(
> field.getName(),
> processRepeatedField(
> value, x -> 
> NumberUtils.toString((ExactNumber) x)));
> } else {
> return tableRow.set(
> field.getName(), 
> NumberUtils.toString((ExactNumber) value));
>

Re: Conditional branching during pipeline execution time

2020-07-07 Thread Luke Cwik
Have both DoFns A and B in the graph at the same time and instead use
another decider DoFn that outputs to either the PCollection that goes to
DoFn A or DoFn B based upon the contents of the side input. Graph would
look something like:

PCollectionView -\
PCollection -> ParDo(Decider) -outA-> PCollection ->
ParDo(DoFnA)
\outB-> PCollection ->
ParDo(DoFnB)

See[1] for how to create a DoFn with multiple outputs.

1:
https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs

On Tue, Jul 7, 2020 at 7:31 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hello Everyone,
>
> Apache Beam allows conditional branching during pipeline construction
> time, but I have to decide whether to execute DoFn A or DoFn B during run
> time (based upon a PCollection flag).
>
> My DoFns A and B are inside a custom transformation class and I am passing
> my flag as PCollectionView to the transformation class. However, Beam does
> not wait for the actual value of the PCollectionView and decides which DoFn
> to call during DAG preparation itself (always goes to else part)
>
> class CustomTx {
>public CustomTx(flag) {
> this.flag = flag;
>}
>
>  public expand {
>   if (flag)
>  DoFn A
>   else
>  DoFn B
>   }
> }
>
> class DoFn A {
> }
>
> class DoFn B {
> }
>
> If I have a DoFn inside my transformation's expand method and pass the
> flag as side input it gives the correct value but then, I cannot call a
> DoFn inside a DoFn. Appreciate any pointers on the best way to approach
> this branching case.
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Beam supports Flink Async IO operator

2020-07-07 Thread Luke Cwik
Beam is a layer that sits on top of execution engines like Flink and
provides its own programming model thus native operators like Flink's async
IO operator are not exposed.

Most people use a DoFn to do all their IO and sometimes will compose it
with another transform such as GroupIntoBatches[1] to simplify their
implementation.

Why do you need async?

1:
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/


On Tue, Jul 7, 2020 at 11:03 AM Eleanore Jin  wrote:

> Hi community,
>
> I cannot find any documentation for Beam supporting Flink async IO
> operator (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
> just wonder is this not supported right now?
>
> Thanks a lot!
> Eleanore
>


Re: ReadFromKafka: UnsupportedOperationException: The ActiveBundle does not have a registered bundle checkpoint handler

2020-07-06 Thread Luke Cwik
The KafkaIO implementation relies on checkpointing to be able to update the
last committed offset. This is currently unsupported in the portable Flink
runner. BEAM-6868[1] is the associated JIRA. Please vote on it and/or offer
to provide an implementation.

1: https://issues.apache.org/jira/browse/BEAM-6868

On Mon, Jul 6, 2020 at 1:42 PM Piotr Filipiuk 
wrote:

> Hi,
>
> I am trying to run a simple example that uses Python API to ReadFromKafka,
> however I am getting the following error when using Flink Runner:
>
> java.lang.UnsupportedOperationException: The ActiveBundle does not have a
> registered bundle checkpoint handler.
>
> See full log in read_from_kafka_flink.log
>
> I am using:
> Kafka 2.5.0
> Beam 2.22.0
> Flink 1.10
>
> When using Direct runner, the pipeline does not fail but does not seem to
> be consuming any data (see read_from_kafka.log) even though the updated
> offsets are being logged:
>
> [2020-07-06 13:36:01,342] {worker_handlers.py:398} INFO - severity: INFO
> timestamp {
>   seconds: 1594067761
>   nanos: 34000
> }
> message: "Reader-0: reading from test-topic-0 starting at offset 165"
> log_location: "org.apache.beam.sdk.io.kafka.KafkaUnboundedSource"
> thread: "23"
>
> I am running both Kafka and Flink locally. I would appreciate your help
> understanding and fixing the issue.
>
> --
> Best regards,
> Piotr
>


Re: Understanding combiner's distribution logic

2020-06-30 Thread Luke Cwik
Your reasoning is correct around the withHotkeyFanout hint and it is to
help runners know that there is likely one or more keys that will have
significantly more data then the others but the logic around how it is
broken up is runner dependent and whether they rely on the hint or not is
also runner dependent. If a runner was smart enough, it wouldn't need the
hint and could automatically detect hotkeys and do the right thing. I would
take a look at this doc[1] to learn about how the optimization can work
from a runners perspective. Some runners never perform the PreCombine,
while others may have multiple rounds of it but the most common case is
that there is only a single PreCombine (assuming it is allowed).

1: https://s.apache.org/beam-runner-api-combine-model

On Tue, Jun 30, 2020 at 10:56 AM Julien Phalip  wrote:

> Hi,
>
> I had a question about how combiners work, particularly on how the
> combined PCollection's subsets are initially formed.
>
> I understand that, according to the documentation
> , a
> combiner allows parallelizing the computation to multiple workers by
> breaking up the PCollection into subsets. I like the database analogy given
> in this post
> ,
> which says that it is similar to pushing down a predicate.
>
> I also understand that it is possible to use withFanout or
> withHotkeyFanout to provide some explicit logic as a hint on how to
> manage the distribution.
>
> What is unclear to me, however, is whether by default the runner already
> plans the distribution of the computation, even when no explicit hints are
> provided. I'm guessing perhaps it always breaks up the PCollection into
> bundles
> 
> (similar to DoFns), then the combiner runs the combination on each bundle,
> saves the result into intermediary accumulators, and those results then
> bubble up recursively to the top? If that's the case, then I assume that
> the purpose of withFanout and withHotKeyFanout is to further break up
> those initially pre-created bundles into even smaller subsets? Or am I
> guessing this wrong? :)
>
> I couldn't find a clear description in the documentation on how the
> PCollection subsets are initially formed. Please let me know if you have
> some details on that, or if it is already documented somewhere.
>
> Thank you!
>
> Julien
>


Re: Re: Can SpannerIO read data from different GCP project?

2020-06-30 Thread Luke Cwik
w-staging-us-central1-661544897337/temp/staging/classes-KrjSD-Y0s4i28kG-XmiBiw.jar
>
>
> There logs printed in gcp servers
>
>
> 2020-06-30 09:44:57.483 HKT
> Finished processing stage F0 with 0 errors in 0.28 seconds
> 2020-06-30 09:44:59.600 HKT
> Starting MapTask stage s01
> 2020-06-30 09:45:00.916 HKT
> in mapfn - get value:myname
> 2020-06-30 09:45:00.934 HKT
> Finished processing stage s01 with 0 errors in 1.333 seconds
> 2020-06-30 09:45:03.025 HKT
> Starting MapTask stage s01
> 2020-06-30 09:45:03.046 HKT
> in mapfn - get value:4
> 2020-06-30 09:45:03.047 HKT
> Finished processing stage s01 with 0 errors in 0.022 seconds
> 2020-06-30 09:45:05.148 HKT
> Starting MapTask stage s01
> 2020-06-30 09:45:05.166 HKT
> in mapfn - get value:2
> 2020-06-30 09:45:05.176 HKT
> Finished processing stage s01 with 0 errors in 0.028 seconds
>
>
> Why Spanner JDBC call happens (in block 1) in my local machine during
> compile phase? while MapFn (in block 2) happens in server side, I expect
> all of them happen in server side.
>
>
>
> At 2020-06-30 00:17:51, "Luke Cwik"  wrote:
>
> The intent is that you grant permissions to the account that is running
> the Dataflow job to the resources you want it to access in project B before
> you start the pipeline. This allows for much finer grain access control and
> the ability to revoke permissions without having to disable an entire
> account.
>
> I would take a look at the general IAM and security documentation within
> GCP[1] or open up a support case with GCP requesting guidance.
>
> 1: https://cloud.google.com/iam
>
> On Sun, Jun 28, 2020 at 8:56 AM Austin Bennett <
> whatwouldausti...@gmail.com> wrote:
>
>> I havent tried yet, but looks like the connection string asks for the
>> project to be specified.  Based on that (and cross project working for
>> other circumstances), I would imagine it will work, but...?  Give it a try!
>>
>> One tricky place might be ensuring proper permissions, in both projects
>> (and without being too open).
>>
>> On Sat, Jun 27, 2020, 5:46 AM Sheng Yang  wrote:
>>
>>> Hi,
>>>
>>> I am working on Beam using Dataflow engine. Recently I am working on
>>> reading spanner data from different project. Say I run my Beam dataflow job
>>> in GCP project A, but the Spanner is in GCP project B. I searched all the
>>> documents, but can't find any documentation about SpannerIO reading data
>>> with the custom credential key files. Right now I am considering JdbcIO
>>> because it accepts custom credential as parameters and spanner also have
>>> jdbc api[1].
>>> Do I have something wrong in my description? Or am I considering the
>>> correct approach?
>>>
>>> String url = "jdbc:cloudspanner:/projects/my_project_id/"
>>>
>>>+ "instances/my_instance_id/"
>>>+ "databases/my_database_name"
>>>+ "?credentials=/home/cloudspanner-keys/my-key.json"
>>>+ ";autocommit=false";try (Connection connection = 
>>> DriverManager.getConnection(url)) {
>>>   try(ResultSet rs = connection.createStatement()
>>>.executeQuery("SELECT SingerId, AlbumId, MarketingBudget FROM 
>>> Albums")) {
>>> while(rs.next()) {
>>>   Long singerId = rs.getLong(1);
>>> }
>>>   }
>>> }
>>>
>>>
>>> [1]: https://github.com/googleapis/java-spanner-jdbc
>>>
>>> Thanks,
>>> Sheng
>>>
>>>
>>>
>>>
>>
>
>
>


Re: Can SpannerIO read data from different GCP project?

2020-06-29 Thread Luke Cwik
The intent is that you grant permissions to the account that is running the
Dataflow job to the resources you want it to access in project B before you
start the pipeline. This allows for much finer grain access control and the
ability to revoke permissions without having to disable an entire account.

I would take a look at the general IAM and security documentation within
GCP[1] or open up a support case with GCP requesting guidance.

1: https://cloud.google.com/iam

On Sun, Jun 28, 2020 at 8:56 AM Austin Bennett 
wrote:

> I havent tried yet, but looks like the connection string asks for the
> project to be specified.  Based on that (and cross project working for
> other circumstances), I would imagine it will work, but...?  Give it a try!
>
> One tricky place might be ensuring proper permissions, in both projects
> (and without being too open).
>
> On Sat, Jun 27, 2020, 5:46 AM Sheng Yang  wrote:
>
>> Hi,
>>
>> I am working on Beam using Dataflow engine. Recently I am working on
>> reading spanner data from different project. Say I run my Beam dataflow job
>> in GCP project A, but the Spanner is in GCP project B. I searched all the
>> documents, but can't find any documentation about SpannerIO reading data
>> with the custom credential key files. Right now I am considering JdbcIO
>> because it accepts custom credential as parameters and spanner also have
>> jdbc api[1].
>> Do I have something wrong in my description? Or am I considering the
>> correct approach?
>>
>> String url = "jdbc:cloudspanner:/projects/my_project_id/"
>>
>>+ "instances/my_instance_id/"
>>+ "databases/my_database_name"
>>+ "?credentials=/home/cloudspanner-keys/my-key.json"
>>+ ";autocommit=false";try (Connection connection = 
>> DriverManager.getConnection(url)) {
>>   try(ResultSet rs = connection.createStatement()
>>.executeQuery("SELECT SingerId, AlbumId, MarketingBudget FROM 
>> Albums")) {
>> while(rs.next()) {
>>   Long singerId = rs.getLong(1);
>> }
>>   }
>> }
>>
>>
>> [1]: https://github.com/googleapis/java-spanner-jdbc
>>
>> Thanks,
>> Sheng
>>
>>
>>
>>
>


Re: DoFn with SideInput

2020-06-29 Thread Luke Cwik
The UpdateFn won't be invoked till the side input is ready which requires
either the watermark to pass the end of the global window + allowed
lateness (to show that the side input is empty) or at least one firing to
populate it with data. See this general section on side inputs[1] and some
useful patterns[2] (there are some examples for how to get globally
windowed side inputs to work).

1: https://beam.apache.org/documentation/programming-guide/#side-inputs
2: https://beam.apache.org/documentation/patterns/side-inputs/

On Sun, Jun 28, 2020 at 6:24 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

>
> Hi All - I am facing an issue while using *side-input*.
>
> *What am I doing:*
> From my main program, I am calling a custom PTransform with a
> PCollectionView as parameter. Inside custom PTransform, I am passing the
> PCollectionView as a side-input to a DoFn.
>
> *Issue:*
> When I run the pipeline, I am expecting the log statement inside my DoFn's
> processElement to get executed but it is not getting logged. If I remove
> the side-input to my DoFn then the log is getting printed. I am suspecting
> whether it could be related to windowing/execution order or my side-input
> somehow being empty. Appreciate if you can clarify on what is going wrong
> here.
>
> *Code Structure:*
>
>
> *Main Program:* PCollectionTuple tuple = input.apply(new FirstTx());
>
>  // Get two tuple tags from first transformation
>  PCollection1 = tuple.get(tag1).setCoder(...);
>  PCollection2 = tuple.get(tag2).setCoder(...);
>
>  // Converting PCollection1 to PCollectionView to use as a side-input
>  // Note: I need to introduce a global window here as my source is
> unbounded and when we use View.asList() it does GroupByKey internally
>   which inturn demands a window
>  PView = PCollection1.apply(Window.>into(new
> GlobalWindows()) // Everything into global window.
>
>  .triggering(Repeatedly.forever(DefaultTrigger.of()))
>
>  .discardingFiredPanes()).apply(Values.create()).apply(View.asList());
>
> // Pass PCollectionView to SecondTx as a param
> PCollection3 = PCollection2.apply(new SecondTx(PView));
>
> *SecondTx:*
> Inside my SecondTx, I am getting the PView from constructor (this.PView =
> PView) and calling a DoFn
>
> public PCollection expand(PCollection  >> input) {
> input.apply(ParDo.of(new UpdateFn()).withSideInput("SideInput", PView));
> ...
> }
>
> // DoFn
> class UpdateFn extends DoFn>>,
> CustomObject> {
> @ProcessElement
> public void processElement(@Element Map Map>> input, OutputReceiver out) {
>* Log.of("UpdateFn " + input);*
> out.output(new CustomObject());
> }
> }
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Apache Beam a Complete Guide - Review?

2020-06-29 Thread Luke Cwik
The author for Apache Beam A Complete Guide does not have good reviews on
Amazon for their other books and as you mentioned no reviews for this one.

I would second the Streaming Systems book as the authors directly worked on
Apache Beam.

On Sun, Jun 28, 2020 at 6:46 PM Wesley Peng  wrote:

> Hi Rion
>
> Rion Williams wrote:
> > I considered that one as well but was in the same boat in terms of not
> > pulling the trigger (lack of reviews, price point, etc.). I eventually
> > landed on Streaming Systems, which I highly, highly recommend if you
> > want to learn more about the Beam model:
> >
> > - http://streamingsystems.net/
> >
> > I don’t think there’s a better book that I’ve come across that focuses
> > on it more (and if there is one, I’d love to know about it). I wrote a
> > blog post that includes a short-review of it if you want a slightly
> > longer summary (http://rion.io/2020/05/09/an-education-in-streaming/),
> > but again - I highly recommend checking it out if you hadn’t already.
>
> Thanks for the answer. I will check the resource you gave above.
>
> Regards.
>


Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Luke Cwik
Sorry, I didn't open a support case. You should open the support case.

On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare  wrote:

> Thanks a lot Luke for following up on this and opening a dataflow
> support.  It would be good to know how streamingEngine solved the problem.
> I will really appreciate it if you can share a link for a support case
> once you open it (if it is possible).
>
> Thanks and Regards
> Mohil
>
>
>
> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik  wrote:
>
>> It seems as though we have seen this failure before for Dataflow and it
>> was caused because the side input tags needed to be unique in a streaming
>> pipeline.
>>
>> It looked like this used to be a common occurrence in the Python SDK[1,
>> 2] because it generated tags that weren't unique enough.
>>
>> I would open up a case with Dataflow support with all the information you
>> have provided here.
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>
>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare  wrote:
>>
>>> Hi Luke and all,
>>>
>>> UPDATE: So when I started my job by *enabling the streaming engine and
>>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>>> the pipeline started successfully.
>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>
>>> Still evaluating to make sure that there is no performance degradation
>>> by doing so.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>>
>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare  wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Let me give you some more details about the code.
>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>
>>>> Default machine type which n1-standard-4.
>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>> it up based on number of cores available)
>>>>
>>>> 1: Code listens for some trigger on pubsub topic:
>>>> /**
>>>>
>>>>  * Read From PubSub for topic ANALYTICS_UPDATE and create 
>>>> PCollection indicating main pipeline to reload * relevant 
>>>> DataAnalyticsData from BQ table */static class 
>>>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform>>> PCollection> {private final 
>>>> String subscriptionName;private final String jobProject;
>>>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String 
>>>> jobProject) {this.subscriptionName = subscriptionName; 
>>>>this.jobProject = jobProject;}@Overridepublic 
>>>> PCollection expand(PBegin input) {
>>>> return input.getPipeline()
>>>> .apply("Read_PubSub_Messages", 
>>>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
>>>> .apply("Applying_Windowing", 
>>>> Window.into(new GlobalWindows())
>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))  
>>>>   .discardingFiredPanes())
>>>> .apply("Read_Update_Status", ParDo.of(new DoFn>>> POJORepresentingJobCompleteInfo>() {@ProcessElement
>>>> public void processElement(@Element PubsubMessage input, 
>>>> OutputReceiver out) { 
>>>>/*** Read and CReate ***/
>>>>
>>>> out.output(POJORepresentingJobCompleteInfo);   
>>>> }}));}}
>>>>
>>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using 
>>>> google cloud bigquery library 
>>>> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>>
>>>> PCollection 
>>>> analyticsDataStatusUpdates = 
>>>> p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>>
>>>> new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
>>>> jobProject));
>>>>
>>>>
>>>> 3. Create various PCollectionViews to be used as side input for decorating 
>>>> stream of logs coming from Ka

Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-26 Thread Luke Cwik
already had 3-4 
>> such transforms and it was working fine. Yesterday I added a few more and 
>> started seeing crashes.
>>
>> If I enable just one of the newly added PCollectionView transforms (keeping 
>> old 3-4 intact), then everything works fine. Moment I enable another new 
>> transform, a crash happens.
>>
>>
>> Hope this provides some more insight. Let me know if I need to open a ticket 
>> or I am doing something wrong or some extra configuration or different 
>> worker machine type need to be provided.
>>
>>
>> Thanks and Regards
>>
>> Mohil
>>
>>
>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare  wrote:
>>
>>> Hi Luke,
>>>
>>> I can send you a code snippet with more details if it helps.
>>>
>>> BTW found similar issue here:
>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E
>>>
>>> Thanks and Regards
>>> Mohil
>>>
>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare  wrote:
>>>
>>>> Hi Luke,
>>>> Thanks for your response, I tried looking at worker logs using the
>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>> due to memory pressure or low number of harness threads.
>>>> Attaching a few more screenshots of crash logs that I found as wells
>>>> json dump of logs.
>>>>
>>>> Let me know if you still think opening a ticket is a right way to go.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik  wrote:
>>>>
>>>>> Try looking at the worker logs to get a full stack trace. Take a look
>>>>> at this page for some debugging guidance[1] or consider opening a support
>>>>> case with GCP.
>>>>>
>>>>> 1:
>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>
>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare  wrote:
>>>>>
>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>> PTransform, I enabled each one of them one by one and the pipeline 
>>>>>> started
>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>> aforementioned PTransform.
>>>>>>
>>>>>> Thanks and regards
>>>>>> Mohil
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare  wrote:
>>>>>>
>>>>>>> Hello All,
>>>>>>>
>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>
>>>>>>> Need urgent help in debugging one issue.
>>>>>>>
>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>>>> where I read data from BQ for a certain timestamp and create
>>>>>>> PCollectionView> to be used as side input in other
>>>>>>> PTransforms.
>>>>>>>
>>>>>>> i.e. something like this:
>>>>>>>
>>>>>>> /**
>>>>>>>  * Get PCollectionView Stats1
>>>>>>>  */
>>>>>>> PCollectionView> stats1View =
>>>>>>> jobCompleteStatus
>>>>>>> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>> .apply("View_S1STATS", View.asSingleton());
>>>>>>>
>>>>>>> /**
>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>  */
>>>>>>> PCollectionView> stats2View =
>>>>>>> jobCompleteStatus
>>>>>>> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>> .apply("View_S2STATS", View.asSingleton());
>>>>>>>
>>>>>>>
>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a 
>>>>>>> message
>>>>>>>
>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>
>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>
>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and 
>>>>>>> screenshot) which I don't know how to debug.
>>>>>>>
>>>>>>>
>>>>>>> Then as an experiment I made a change where I enabled only one new 
>>>>>>> transformation
>>>>>>>
>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>
>>>>>>> So it looks like some memory issue.
>>>>>>>
>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>
>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>
>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>
>>>>>>> I could't find any other log.
>>>>>>>
>>>>>>>
>>>>>>> I would really appreciate quick help here.
>>>>>>>
>>>>>>>
>>>>>>> Thanks and Regards
>>>>>>>
>>>>>>> Mohil
>>>>>>>
>>>>>>>
>>>>>>>


Re: Caching data inside DoFn

2020-06-26 Thread Luke Cwik
Use a stateful DoFn and buffer the elements in a bag state. You'll want to
use a key that contains enough data to match your join condition you are
trying to match. For example, if your trying to match on a customerId then
you would do something like:
element 1 -> ParDo(extract customer id) -> KV ->
stateful ParDo(buffer element 1 in bag state)
...
element 5 -> ParDo(extract customer id) -> KV ->
stateful ParDo(output all element in bag)

If you are matching on cudomerId and eventId then you would use a composite
key (customerId, eventId).

You can always use a single global key but you will lose all parallelism
during processing (for small pipelines this likely won't matter).

On Fri, Jun 26, 2020 at 7:29 AM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi All - I have a DoFn which generates data (KV pair) for each element
> that it is processing. It also has to read from that KV for other elements
> based on a key which means, the KV has to retain all the data that's
> getting added to it while processing every element. I was thinking
> about the "slow-caching side input pattern" but it is more of caching
> outside the DoFn and then using it inside. It doesn't update the cache
> inside a DoFn. Please share if anyone has thoughts on how to approach this
> case.
>
> Element 1 > Add a record to a KV > . Element 5 > Used the value from
> KV if there is a match in the key
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Getting PC: @ 0x7f7490e08602 (unknown) raise and Check failure stack trace: ***

2020-06-25 Thread Luke Cwik
Try looking at the worker logs to get a full stack trace. Take a look at
this page for some debugging guidance[1] or consider opening a support case
with GCP.

1:
https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline

On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare  wrote:

> BTW, just to make sure that there is no issue with any individual
> PTransform, I enabled each one of them one by one and the pipeline started
> successfully. Issue happens as soon as I enable more than one new
> aforementioned PTransform.
>
> Thanks and regards
> Mohil
>
> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare  wrote:
>
>> Hello All,
>>
>> I am using the BEAM java 2.19.0 version on google dataflow.
>>
>> Need urgent help in debugging one issue.
>>
>> I recently added 3-4 new PTransformations. to an existing pipeline where
>> I read data from BQ for a certain timestamp and create
>> PCollectionView> to be used as side input in other
>> PTransforms.
>>
>> i.e. something like this:
>>
>> /**
>>  * Get PCollectionView Stats1
>>  */
>> PCollectionView> stats1View =
>> jobCompleteStatus
>> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>> .apply("View_S1STATS", View.asSingleton());
>>
>> /**
>>  * Get PCollectionView of Stats2
>>  */
>> PCollectionView> stats2View =
>> jobCompleteStatus
>> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>> .apply("View_S2STATS", View.asSingleton());
>>
>>
>> and a couple more like these PTransforms. Here jobCompleteStatus is a message
>>
>> received from PubSub that act as a trigger to reload these views.
>>
>> The moment I deployed the above pipeline, it didn't start and
>>
>> error reporting gave weird exceptions(see attached screenshot1 and 
>> screenshot) which I don't know how to debug.
>>
>>
>> Then as an experiment I made a change where I enabled only one new 
>> transformation
>>
>> and disabled others. This time I didn't see any issue.
>>
>> So it looks like some memory issue.
>>
>> I also compared worker logs between working case and non working case
>>
>> and it looks resources were not granted in non working case.
>>
>> (See attached working-workerlogs and nonworking-workerlogs)
>>
>> I could't find any other log.
>>
>>
>> I would really appreciate quick help here.
>>
>>
>> Thanks and Regards
>>
>> Mohil
>>
>>
>>


Re: Sorting JSON output by key

2020-06-25 Thread Luke Cwik
Have you taken a look at the sorter extension[1]?

1: https://beam.apache.org/documentation/sdks/java-extensions/#sorter

On Thu, Jun 25, 2020 at 6:18 AM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hi all,
>
> I have a pipeline that outputs json elements
> Next, I want to read the JSON elements in order by one of the keys.
>
> Any idea how to do it within the pipeline?
>
> Thanks,
> Eila
>
>
>
> —
> Eila
> www.orielesearch.com
> https://www.meetu 
> p.co 
> m/Deep-Learning-In-Production
> 
>
> Sent from my iPhone
>


Re: How to create a marker file when each window completes?

2020-06-24 Thread Luke Cwik
You can use the @OnWindowExpiration with a stateful DoFn that consumes the
PCollection (you'll need to convert the PCollection to be
a keyed PCollection which you can do that with WithKeys.of(null)) . The
window expiration will only be invoked once all upstream processing for
that window has been completed.

On Wed, Jun 24, 2020 at 9:18 AM Sunny, Mani Kolbe  wrote:

> Lets say I set *FixedWindows.of*(Duration.standardMinutes(10))
>
> Since my event time is determined by WithTimestamps.of(Instant.now()), I
> can safely assume window is closed once that 10 min period is passed. But
> how do I ensure that all records belonged to that  window are already
> flushed to disk. That I can safely create a .COMPLETE flag?
>
>
>
> *From:* Luke Cwik 
> *Sent:* Wednesday, June 24, 2020 5:10 PM
> *To:* user 
> *Subject:* Re: How to create a marker file when each window completes?
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Knowing when a window is "closed" is based upon having the watermark
> advance which is based upon even time.
>
>
>
> On Wed, Jun 24, 2020 at 9:05 AM Sunny, Mani Kolbe  wrote:
>
> Hi Luke,
>
>
>
> Sorry forgot to mention, we override the event timestamp to current using
> WithTimestamps.of(Instant.now()) as we don’t really care actual event time.
> So FixedWindow closes when current time passes window.end time.
>
>
>
> It is a standard practice in oozie world to trigger downstream jobs based
> on marker files. So thought someone might have encountered this before
> problem before me.
>
>
>
> Regards,
>
> Mani
>
>
>
>
>
>
>
> *From:* Luke Cwik 
> *Sent:* Wednesday, June 24, 2020 4:23 PM
> *To:* user 
> *Subject:* Re: How to create a marker file when each window completes?
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> What do you consider complete? (I ask this since you are using element
> count and processing time triggers)
>
>
>
> Generally the idea is that you can feed the output
> PCollection to a stateful DoFn with
> an @OnWindowExpiration setup but this works only if you completeness is
> controlled by watermark advancement. Note that @OnWindowExpiration is new
> to Beam so it may not yet work with Spark.
>
>
>
> On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe  wrote:
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is all working fine. But to support some legacy downstream services,
> we need to ensure that output partitions has marker file to indicate that
> data completeness and is ready for downstream conception. Something like
> hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker
> file in Beam on window closing event?
>
>
>
> Regards,
>
> Mani
>
>


Re: How to create a marker file when each window completes?

2020-06-24 Thread Luke Cwik
Knowing when a window is "closed" is based upon having the watermark
advance which is based upon even time.

On Wed, Jun 24, 2020 at 9:05 AM Sunny, Mani Kolbe  wrote:

> Hi Luke,
>
>
>
> Sorry forgot to mention, we override the event timestamp to current using
> WithTimestamps.of(Instant.now()) as we don’t really care actual event time.
> So FixedWindow closes when current time passes window.end time.
>
>
>
> It is a standard practice in oozie world to trigger downstream jobs based
> on marker files. So thought someone might have encountered this before
> problem before me.
>
>
>
> Regards,
>
> Mani
>
>
>
>
>
>
>
> *From:* Luke Cwik 
> *Sent:* Wednesday, June 24, 2020 4:23 PM
> *To:* user 
> *Subject:* Re: How to create a marker file when each window completes?
>
>
>
> *CAUTION:* This email originated from outside of D Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> What do you consider complete? (I ask this since you are using element
> count and processing time triggers)
>
>
>
> Generally the idea is that you can feed the output
> PCollection to a stateful DoFn with
> an @OnWindowExpiration setup but this works only if you completeness is
> controlled by watermark advancement. Note that @OnWindowExpiration is new
> to Beam so it may not yet work with Spark.
>
>
>
> On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe  wrote:
>
> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is all working fine. But to support some legacy downstream services,
> we need to ensure that output partitions has marker file to indicate that
> data completeness and is ready for downstream conception. Something like
> hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker
> file in Beam on window closing event?
>
>
>
> Regards,
>
> Mani
>
>


Re: How to create a marker file when each window completes?

2020-06-24 Thread Luke Cwik
What do you consider complete? (I ask this since you are using element
count and processing time triggers)

Generally the idea is that you can feed the output
PCollection to a stateful DoFn with
an @OnWindowExpiration setup but this works only if you completeness is
controlled by watermark advancement. Note that @OnWindowExpiration is new
to Beam so it may not yet work with Spark.

On Wed, Jun 24, 2020 at 4:22 AM Sunny, Mani Kolbe  wrote:

> Hello,
>
>
>
> We are developing a beam pipeline which runs on SparkRunner on streaming
> mode. This pipeline read from Kinesis, do some translations, filtering and
> finally output to S3 using AvroIO writer. We are using Fixed windows with
> triggers based on element count and processing time intervals. Outputs path
> is partitioned by window start timestamp. allowedLateness=0sec
>
>
>
> This is all working fine. But to support some legacy downstream services,
> we need to ensure that output partitions has marker file to indicate that
> data completeness and is ready for downstream conception. Something like
> hadoop’s .SUCCESS file or a .COMPLETE. Is there way to create such a marker
> file in Beam on window closing event?
>
>
>
> Regards,
>
> Mani
>


Re: Designing an existing pipeline in Beam

2020-06-23 Thread Luke Cwik
You can apply the same DoFn / Transform instance multiple times in the
graph or you can follow regular development practices where the common code
is factored into a method and two different DoFn's invoke it.

On Tue, Jun 23, 2020 at 4:28 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke - Thanks for the explanation. The limitation due to directed graph
> processing and the option of external storage clears most of the questions
> I had with respect to designing this pipeline. I do have one more scenario
> to clarify on this thread.
>
> If I had a certain piece of logic that I had to use in more than one DoFns
> how do we do that. In a normal Java application, we can put it as a
> separate method and call it wherever we want. Is it possible to replicate
> something like that in Beam's DoFn?
>
> On Tue, Jun 23, 2020 at 3:47 PM Luke Cwik  wrote:
>
>> Beam is really about parallelizing the processing. Using a single DoFn
>> that does everything is fine as long as the DoFn can process elements in
>> parallel (e.g. upstream source produces lots of elements). Composing
>> multiple DoFns is great for re-use and testing but it isn't strictly
>> necessary. Also, Beam doesn't support back edges in the processing graph so
>> all data flows in one direction and you can't have a cycle. This only
>> allows for map 1 to producie map 2 which then produces map 3 which is then
>> used to update map 1 if all of that logic is within a single DoFn/Transform
>> or you create a cycle using an external system such as write to Kafka topic
>> X and read from Kafka topic X within the same pipeline or update a database
>> downstream from where it is read. There is a lot of ordering complexity and
>> stale data issues whenever using an external store to create a cycle though.
>>
>> On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
>> harish.prav...@gmail.com> wrote:
>>
>>> Another way to put this question is, how do we write a beam pipeline for
>>> an existing pipeline (in Java) that has a dozen of custom objects and you
>>> have to work with multiple HashMaps of those custom objects in order to
>>> transform it. Currently, I am writing a beam pipeline by using the same
>>> Custom objects, getters and setters and HashMap *but
>>> inside a DoFn*. Is this the optimal way or does Beam offer something
>>> else?
>>>
>>> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> We can say Map 2 as a kind of a template using which you want to enrich
>>>> data in Map 1. As I mentioned in my previous post, this is a high level
>>>> scenario.
>>>>
>>>> All these logic are spread across several classes (with ~4K lines of
>>>> code in total). As in any Java application,
>>>>
>>>> 1. The code has been modularized with multiple method calls
>>>> 2. Passing around HashMaps as argument to each method
>>>> 3. Accessing the attributes of the custom object using getters and
>>>> setters.
>>>>
>>>> This is a common pattern in a normal Java application but I have not
>>>> seen such an example of code in Beam.
>>>>
>>>>
>>>> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>>>>
>>>>> Who reads map 1?
>>>>> Can it be stale?
>>>>>
>>>>> It is unclear what you are trying to do in parallel and why you
>>>>> wouldn't stick all this logic into a single DoFn / stateful DoFn.
>>>>>
>>>>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>>>>> harish.prav...@gmail.com> wrote:
>>>>>
>>>>>> Hello Everyone,
>>>>>>
>>>>>> I am in the process of implementing an existing pipeline (written
>>>>>> using Java and Kafka) in Apache Beam. The data from the source stream is
>>>>>> contrived and had to go through several steps of enrichment using REST 
>>>>>> API
>>>>>> calls and parsing of JSON data. The key
>>>>>> transformation in the existing pipeline is in shown below (a super
>>>>>> high level flow)
>>>>>>
>>>>>> *Method A*
>>>>>> Calls *Method B*
>>>>>>   Creates *Map 1, Map 2*
>>>>>> Calls *Method C*
>>>>>>  Read *Map 2*
>>>>>>  C

Re: Designing an existing pipeline in Beam

2020-06-23 Thread Luke Cwik
Beam is really about parallelizing the processing. Using a single DoFn that
does everything is fine as long as the DoFn can process elements in
parallel (e.g. upstream source produces lots of elements). Composing
multiple DoFns is great for re-use and testing but it isn't strictly
necessary. Also, Beam doesn't support back edges in the processing graph so
all data flows in one direction and you can't have a cycle. This only
allows for map 1 to producie map 2 which then produces map 3 which is then
used to update map 1 if all of that logic is within a single DoFn/Transform
or you create a cycle using an external system such as write to Kafka topic
X and read from Kafka topic X within the same pipeline or update a database
downstream from where it is read. There is a lot of ordering complexity and
stale data issues whenever using an external store to create a cycle though.

On Mon, Jun 22, 2020 at 6:02 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Another way to put this question is, how do we write a beam pipeline for
> an existing pipeline (in Java) that has a dozen of custom objects and you
> have to work with multiple HashMaps of those custom objects in order to
> transform it. Currently, I am writing a beam pipeline by using the same
> Custom objects, getters and setters and HashMap *but
> inside a DoFn*. Is this the optimal way or does Beam offer something else?
>
> On Mon, Jun 22, 2020 at 3:47 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi Luke,
>>
>> We can say Map 2 as a kind of a template using which you want to enrich
>> data in Map 1. As I mentioned in my previous post, this is a high level
>> scenario.
>>
>> All these logic are spread across several classes (with ~4K lines of code
>> in total). As in any Java application,
>>
>> 1. The code has been modularized with multiple method calls
>> 2. Passing around HashMaps as argument to each method
>> 3. Accessing the attributes of the custom object using getters and
>> setters.
>>
>> This is a common pattern in a normal Java application but I have not seen
>> such an example of code in Beam.
>>
>>
>> On Mon, Jun 22, 2020 at 8:23 AM Luke Cwik  wrote:
>>
>>> Who reads map 1?
>>> Can it be stale?
>>>
>>> It is unclear what you are trying to do in parallel and why you wouldn't
>>> stick all this logic into a single DoFn / stateful DoFn.
>>>
>>> On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
>>> harish.prav...@gmail.com> wrote:
>>>
>>>> Hello Everyone,
>>>>
>>>> I am in the process of implementing an existing pipeline (written using
>>>> Java and Kafka) in Apache Beam. The data from the source stream is
>>>> contrived and had to go through several steps of enrichment using REST API
>>>> calls and parsing of JSON data. The key
>>>> transformation in the existing pipeline is in shown below (a super high
>>>> level flow)
>>>>
>>>> *Method A*
>>>> Calls *Method B*
>>>>   Creates *Map 1, Map 2*
>>>> Calls *Method C*
>>>>  Read *Map 2*
>>>>  Create *Map 3*
>>>> *Method C*
>>>>  Read *Map 3* and
>>>>  update *Map 1*
>>>>
>>>> The Map we use are multi-level maps and I am thinking of having
>>>> PCollections for each Maps and pass them as side inputs in a DoFn wherever
>>>> I have transformations that need two or more Maps. But there are certain
>>>> tasks which I want to make sure that I am following right approach, for
>>>> instance updating one of the side input maps inside a DoFn.
>>>>
>>>> These are my initial thoughts/questions and I would like to get some
>>>> expert advice on how we typically design such an interleaved transformation
>>>> in Apache Beam. Appreciate your valuable insights on this.
>>>>
>>>> --
>>>> Thanks,
>>>> Praveen K Viswanathan
>>>>
>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Designing an existing pipeline in Beam

2020-06-22 Thread Luke Cwik
Who reads map 1?
Can it be stale?

It is unclear what you are trying to do in parallel and why you wouldn't
stick all this logic into a single DoFn / stateful DoFn.

On Sat, Jun 20, 2020 at 7:14 PM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hello Everyone,
>
> I am in the process of implementing an existing pipeline (written using
> Java and Kafka) in Apache Beam. The data from the source stream is
> contrived and had to go through several steps of enrichment using REST API
> calls and parsing of JSON data. The key
> transformation in the existing pipeline is in shown below (a super high
> level flow)
>
> *Method A*
> Calls *Method B*
>   Creates *Map 1, Map 2*
> Calls *Method C*
>  Read *Map 2*
>  Create *Map 3*
> *Method C*
>  Read *Map 3* and
>  update *Map 1*
>
> The Map we use are multi-level maps and I am thinking of having
> PCollections for each Maps and pass them as side inputs in a DoFn wherever
> I have transformations that need two or more Maps. But there are certain
> tasks which I want to make sure that I am following right approach, for
> instance updating one of the side input maps inside a DoFn.
>
> These are my initial thoughts/questions and I would like to get some
> expert advice on how we typically design such an interleaved transformation
> in Apache Beam. Appreciate your valuable insights on this.
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: Strange graph generated by beam

2020-06-22 Thread Luke Cwik
It would be helpful if the names of the transforms were visible on the
graph otherwise it is really hard understanding what each stage and step do.

On Mon, Jun 22, 2020 at 3:53 AM 吴亭  wrote:

> Hi,
>
> We are using the beam to read the stream from Kafka and using spark as the
> runner, and I found our spark graph looks very strange.
>
> For each batch stream, it will generate 3 stages, one of them of our
> actual work, that I can understand.
>
> Another two is kind of duplicated, you can take a look at the attached
> graphs:
> [image: image.png]
>
>
> [image: image.png]
> You will see the second graph actually includes the first one, I have no
> idea why it will display two?
>
> Is this correct?
>
> Br,
> Tim
>


Re: Question about beam streaming pipelines.

2020-06-19 Thread Luke Cwik
Based on what you describe that makes sense to me. Window merging could be
used with session like windows limiting merging based upon # bytes but this
will not be obvious to users reading your code afterwards so I wouldn't
recommend it.

If you don't have to be so strict in the number of bytes or require that
there is a maximal amount of packing of messages you could take a look into
GroupIntoBatches[1].

1:
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/



On Fri, Jun 19, 2020 at 1:07 PM Jaya Johnson 
wrote:

>
>
> On Fri, Jun 19, 2020 at 9:51 AM Jaya Johnson 
> wrote:
>
>> I am trying to set up a pipeline that does the following.
>> For a window of n seconds I want to pack messages received in that window
>> (messages are strings)  to a single string with "\n" as delimiter and push.
>> I am trying to use the following for this:
>> apply fixed window to the input pcollections.
>> to the windowed pcollections - apply Combine.globally and write a custom
>> function to concat the interable strings to a single string.
>>
>> Does this seem like the right approach? Are there built in custom
>> transformations I could use for this?
>>
>> Also if I want to do custom sizing of the packed messages to say a zipped
>> format or limit by # of bytes can I add this to the window itself or would
>> I need a custom transformation for this.
>>
>> Thank you!
>>
>>


Re: Making RPCs in Beam

2020-06-19 Thread Luke Cwik
I would suggest looking at the GroupIntoBatches[1] transform if you want to
support batching of elements instead of writing it yourself. Single RPC
calls should be invoked from the DoFn like you would from a regular program.

1:
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/

On Fri, Jun 19, 2020 at 8:20 AM Brian Hulette  wrote:

> Kenn wrote a blog post showing how to do batched RPCs with the state and
> timer APIs: https://beam.apache.org/blog/timely-processing/
>
> Is that helpful?
>
> Brian
>
> On Thu, Jun 18, 2020 at 5:29 PM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> In my pipeline I have to make a *single RPC call* as well as a *Batched
>> RPC call* to fetch data for enrichment. I could not find any reference
>> on how to make these call within your pipeline. I am still covering my
>> grounds in Apache Beam and would appreciate if anyone has done this and
>> could share a sample code or details on how to do this.
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>


Re: unable to read kerberized HDFS using dataflow

2020-06-16 Thread Luke Cwik
Posted comments on your SO question.

On Tue, Jun 16, 2020 at 4:32 AM Vince Gonzalez 
wrote:

> Is there specific configuration required to ensure that workers get access
> to UserGroupInformation when using TextIO? I am using Beam 2.22.0 on the
> dataflow runner.
>
> My main method looks like this below. My HdfsTextIOOptions extends
> HadoopFileSystemOptions, and I set the HdfsConfiguration on the options
> instance. I am using a keytab to authenticate. I'm not sure whether
> using UserGroupInformation.setConfiguration() is sufficient to ensure the
> UGI makes it to all the workers. My pipeline fails with this exception:
>
> Error message from worker: org.apache.hadoop.security.AccessControlException: 
> SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
>
>
>   public static void main(String[] args) throws IOException {
> System.setProperty("java.security.krb5.realm", "MY_REALM");
> System.setProperty("java.security.krb5.kdc", "my.kdc.hostname");
>
> HdfsTextIOOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(
> HdfsTextIOOptions.class);
>
> Storage storage = StorageOptions.getDefaultInstance().getService();
> URI uri = URI.create(options.getGcsKeytabPath());
> System.err.println(String
> .format("URI: %s, filesystem: %s, bucket: %s, filename: %s", 
> uri.toString(),
> uri.getScheme(), uri.getAuthority(),
> uri.getPath()));
> Blob keytabBlob = storage.get(BlobId.of(uri.getAuthority(),
> uri.getPath().startsWith("/") ? uri.getPath().substring(1) : 
> uri.getPath()));
> Path localKeytabPath = Paths.get("/tmp", uri.getPath());
> System.err.println(localKeytabPath);
>
> keytabBlob.downloadTo(localKeytabPath);
>
> Configuration conf = new Configuration();
> conf.set("fs.defaultFS", "hdfs://namenode:8020");
> conf.set("hadoop.security.authentication", "kerberos");
>
> UserGroupInformation
> .loginUserFromKeytab(options.getUserPrincipal(), 
> localKeytabPath.toString());
> UserGroupInformation.setConfiguration(conf);
>
> options.setHdfsConfiguration(ImmutableList.of(conf));
>
> Pipeline p = Pipeline.create(options);
>
> p.apply(TextIO.read().from(options.getInputFile()))
> ...
>
> I also posted to stackoverflow:
> https://stackoverflow.com/questions/62397379/google-cloud-dataflow-textio-and-kerberized-hdfs
>
> Thanks for any leads!
>
> --vince
>
>


Re: Building Dataflow Worker

2020-06-15 Thread Luke Cwik
I noticed that you are not using the gradle wrapper but your own installed
version. Apache Beam 2.19 is using gradle 5.2.1, is the installed version
compatible with that?

Try
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
in a clean workspace.

On Fri, Jun 12, 2020 at 4:30 PM Talat Uyarer 
wrote:

> Hi,
>
> I want to build the dataflow worker on apache beram 2.19. However I faced
> a grpc issue. I did not change anything. Just checked release-2.19.0 branch
> and run build command. Could you help me understand why it does not build.
> [1]
>
> Additional information, Based on my limited knowledge Looks like it is
> looking for a class which is coming grpc 1.26 version. But beam 2.19
> version is using grpc 1.21
>
> You can find build output below.
>
> Thanks
> [1] http://dpaste.com/15X7429
>


Re: Beam supports Flink RichAsyncFunction

2020-06-15 Thread Luke Cwik
The intent is that users shouldn't have to use async I/O since the idea is
that the runner should increase the number of workers/threads being
processed automatically so that you never need to special case this.
Unfortunately Dataflow is the only one who does this today so you are
forced to use something like GroupIntoBatches[1] to gather input elements
that you convert into requests you want to send and manage your own threads
/ completion.

1:
https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/

On Sun, Jun 14, 2020 at 7:21 PM Eleanore Jin  wrote:

> Hi Community,
>
> I am trying to convert an existing Flink job into Beam pipeline. In the
> current Flink job, we have async I/O operator (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html)
> which extends RichAsyncFunction
> 
> .
>
> I did not find any document online for Beam to support this, if it is
> documented somewhere, can you please point to me?
>
> In case Beam does not support it, is there any suggested 'work around' for
> it?
>
> Thanks a lot!
> Eleanore
>


Re: How to safely update jobs in-flight using Apache Beam on AWS EMR?

2020-06-10 Thread Luke Cwik
The runner needs to support it and I'm not aware of an EMR runner for
Apache Beam let alone one that supports pipeline update. Have you tried
reaching out to AWS?

On Wed, Jun 10, 2020 at 11:14 AM Dan Hill  wrote:

> Hi!  I found great docs about Apache Beam on Dataflow (which makes
> sense).  I was not able to find this about AWS EMR.
>
> https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline
>
>
> https://medium.com/google-cloud/restarting-cloud-dataflow-in-flight-9c688c49adfd
>


Re: Running apache_beam python sdk without c/c++ libs

2020-06-10 Thread Luke Cwik
I'm not sure. It depends on whether the Spark -> Beam Python integration
will interfere with the magic built into AWS Glue.

On Wed, Jun 10, 2020 at 8:57 AM Noah Goodrich  wrote:

> I was hoping to use the Spark runner since Glue is just Spark with some
> magic on top. And in our specific use case, we'd be looking at working with
> S3, Kinesis, and MySQL RDS.
>
> Sounds like this is a non-starter?
>
> On Wed, Jun 10, 2020 at 9:33 AM Luke Cwik  wrote:
>
>> Most runners are written in Java while others are cloud offerings which
>> wouldn't work for your use case which limits you to use the direct runner
>> (not meant for production/high performance applications). Beam Python SDK
>> uses cython for performance reasons but I don't believe it strictly
>> requires it as many unit tests run with and without cython enabled.
>> Integrations between Beam and third party libraries may require it though
>> so it likely depends on what you plan to do.
>>
>> On Wed, Jun 10, 2020 at 8:17 AM Noah Goodrich 
>> wrote:
>>
>>> I am looking at using the Beam Python SDK in AWS Glue but it doesn't
>>> support non-native python libraries (anything that is c/c++ based).
>>>
>>> Is the Beam Python SDK / runners able to be used without any c/c++
>>> library dependencies?
>>>
>>


Re: Running apache_beam python sdk without c/c++ libs

2020-06-10 Thread Luke Cwik
Most runners are written in Java while others are cloud offerings which
wouldn't work for your use case which limits you to use the direct runner
(not meant for production/high performance applications). Beam Python SDK
uses cython for performance reasons but I don't believe it strictly
requires it as many unit tests run with and without cython enabled.
Integrations between Beam and third party libraries may require it though
so it likely depends on what you plan to do.

On Wed, Jun 10, 2020 at 8:17 AM Noah Goodrich  wrote:

> I am looking at using the Beam Python SDK in AWS Glue but it doesn't
> support non-native python libraries (anything that is c/c++ based).
>
> Is the Beam Python SDK / runners able to be used without any c/c++ library
> dependencies?
>


Re: Writing pipeline output to google sheet in google drive

2020-06-08 Thread Luke Cwik
It doesn't look like BigQuery supports exporting to Google sheet[1], maybe
you can invoke this BQ connector directly by adding a transform that
follows the BQ sink.

1: https://cloud.google.com/bigquery/docs/exporting-data#export_limitations

On Sat, Jun 6, 2020 at 8:31 PM OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello,
>
> Is it possible to have the pipeline sink to a google sheet within a
> specific google drive directory.
> Something like that:
>
> p =  beam.Pipeline(options=options)
> (p | 'Step 1: read file ' >> beam.io.ReadFromText(path/to/file)
>| 'Step 2:  process data  ' >> beam.ParDo(get_daata(l]))
>| 'step 3: write data to gsheet  ' >> beam.io.WriteToXXX(GSHEET PATH))
>
>
> I know that BQ has a connector to Google sheet. Is it possible to use this
> connector from the BQ sink? Other way?
>
> Thanks,
> Eila
>
>


Re: Python SDK ReadFromKafka: Timeout expired while fetching topic metadata

2020-06-05 Thread Luke Cwik
+dev  +Chamikara Jayalath  +Heejong
Lee 

On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk 
wrote:

> I am unable to read from Kafka and getting the following warnings & errors
> when calling kafka.ReadFromKafka() (Python SDK):
>
> WARNING:root:severity: WARN
> timestamp {
>   seconds: 1591370012
>   nanos: 52300
> }
> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1
> could not be established. Broker may not be available."
> log_location: "org.apache.kafka.clients.NetworkClient"
> thread: "18"
>
> Finally the pipeline fails with:
>
> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
> fetching topic metadata
>
> See more complete log attached.
>
> The relevant code snippet:
>
> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
> ...
> kafka.ReadFromKafka(
> consumer_config=consumer_conf,
> topics=[args.topic],
> )
> ...
>
> Also see full python script attached.
>
> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am
> also not able to read from topic.
>
> I am using kafka 2.5.0 and started the broker by following
> https://kafka.apache.org/quickstart - using default
> config/server.properties.
>
> Everything runs locally, and I verified that I can publish from
> that topic using confluent_kafka library.
>
> --
> Best regards,
> Piotr
>


Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-02 Thread Luke Cwik
Using side inputs is fine and is a common pattern. You should take a look
at "slowly changing side inputs"[1] as there is some example code there.

1:
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing

On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare  wrote:

> Thanks Luke for your reply.
> I see. I am trying to recall why I added allowedLateness as 360 days.
> Anyways I will try without that.
>
> But do you think the approach I am using to keep getting a running score
> in a sliding window and then using it as a side input to decorate the main
> log  is correct ? Or I can achieve same thing is a much better and
> optimized way.
>
> Thanks again
> Mohil
>
> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:
>
>> Your allowed lateness is 360 days and since the trigger you have doesn't
>> emit speculative results, you'll have to wait till the watermark advances
>> to the end of windows timestamp + 360 days before something is output from
>> the grouping aggregation/available at the side input.
>>
>>
>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>>
>>> Hello all,
>>>
>>> Any suggestions? Where am I going wrong or is there any better way of
>>> achieving this so that I can do replay as well ?
>>>
>>> Thanks
>>> Mohil
>>>
>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>>>
>>>> Hi everyone,
>>>> I need a suggestion regarding usage of the side input pattern and
>>>> sliding window, especially while replaying old kafka logs/offsets.
>>>>
>>>> FYI: I am running beam 2.19 on google dataflow.
>>>>
>>>> I have a use case where I read a continuous stream of data from Kafka
>>>> and need to calculate one score (apart from other calculations) per key
>>>> which is based on the number of such requests that are received per key in
>>>> the last one hour.
>>>>
>>>> Roughly my code looks like following:
>>>>
>>>> PCollection = p
>>>> .apply("Read__Logs_From_Kafka", KafkaIO.read()
>>>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>>> .withTopic("app_access_stats")
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(ByteArrayDeserializer.class)
>>>> .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>> .withConsumerFactoryFn(consumerFactoryObj)
>>>> .commitOffsetsInFinalize())
>>>> .apply("Applying_Fixed_Window_Logs", Window.>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>> 
>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
>>>> .withAllowedLateness(Duration.standardDays(380))
>>>> .discardingFiredPanes())
>>>> .apply("Convert_KafkaRecord_To_PCollection",
>>>> ParDo.of(new ParseKafkaLogs()));
>>>>
>>>>
>>>> /*** Class that handles incoming PCollection and calculate score ***/
>>>>
>>>> /**. Assumeinput = incoming PCollection as created above
>>>>
>>>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>>>>
>>>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>>
>>>> /**Calculate Running sum of num of reqs in sliding window
>>>>
>>>> Starting sliding window of duration 1 hr every 1 sec so that we can 
>>>> get accurate result of past 1 hr
>>>>
>>>> **/
>>>>
>>>>
>>>> private static class WindowedNumUserRequestsPerKey extends 
>>>> PTransform, PCollection>> {
>>>>
>>>> @Override
>>>> public PCollection> expand(PCollection input) {
>>>>
>>>> return input
>>>> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>>>> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>> 
>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>> .apply("Grouping_per_Key&

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-01 Thread Luke Cwik
Your allowed lateness is 360 days and since the trigger you have doesn't
emit speculative results, you'll have to wait till the watermark advances
to the end of windows timestamp + 360 days before something is output from
the grouping aggregation/available at the side input.


On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:

> Hello all,
>
> Any suggestions? Where am I going wrong or is there any better way of
> achieving this so that I can do replay as well ?
>
> Thanks
> Mohil
>
> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>
>> Hi everyone,
>> I need a suggestion regarding usage of the side input pattern and sliding
>> window, especially while replaying old kafka logs/offsets.
>>
>> FYI: I am running beam 2.19 on google dataflow.
>>
>> I have a use case where I read a continuous stream of data from Kafka and
>> need to calculate one score (apart from other calculations) per key  which
>> is based on the number of such requests that are received per key in the
>> last one hour.
>>
>> Roughly my code looks like following:
>>
>> PCollection = p
>> .apply("Read__Logs_From_Kafka", KafkaIO.read()
>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>> .withTopic("app_access_stats")
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ByteArrayDeserializer.class)
>> .withConsumerConfigUpdates(kafkaConsumerProperties)
>> .withConsumerFactoryFn(consumerFactoryObj)
>> .commitOffsetsInFinalize())
>> .apply("Applying_Fixed_Window_Logs", Window.> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>> 
>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
>> .withAllowedLateness(Duration.standardDays(380))
>> .discardingFiredPanes())
>> .apply("Convert_KafkaRecord_To_PCollection",
>> ParDo.of(new ParseKafkaLogs()));
>>
>>
>> /*** Class that handles incoming PCollection and calculate score ***/
>>
>> /**. Assumeinput = incoming PCollection as created above
>>
>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>>
>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>
>> /**Calculate Running sum of num of reqs in sliding window
>>
>> Starting sliding window of duration 1 hr every 1 sec so that we can get 
>> accurate result of past 1 hr
>>
>> **/
>>
>>
>> private static class WindowedNumUserRequestsPerKey extends 
>> PTransform, PCollection>> {
>>
>> @Override
>> public PCollection> expand(PCollection input) {
>>
>> return input
>> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>> 
>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>> .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>> .apply("Total_Requests_Per_Key", Combine.perKey(new 
>> CalculateTotalUserRequestsPerKey()));
>> }
>>
>> private static class GroupByAggregationKey extends DoFn> POJO>> {
>> @ProcessElement
>> public void processElement(@Element POJO input, 
>> OutputReceiver> out) {
>> /** code that emits required KV /
>>
>> }
>> }
>>
>> private static class CalculateTotalUserRequestsPerKey extends 
>> Combine.CombineFn> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>> private static class TotalRequestsAccumulator implements 
>> Serializable {
>> private long num_requests_running_sum = 0;
>>
>> TotalRequestsAccumulator(long num_requests_running_sum) {
>> this.num_requests_running_sum = num_requests_running_sum;
>> }
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (!(o instanceof TotalRequestsAccumulator)) return false;
>> TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>> return num_requests_running_sum == 
>> that.num_requests_running_sum;
>> }
>>
>> @Override
>> public int hashCode() {
>> return Objects.hash(num_requests_running_sum);
>> }
>> }
>>
>> @Override
>> public TotalRequestsAccumulator createAccumulator() {
>> return new TotalRequestsAccumulator(0);
>> }
>>
>> @Override
>> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
>> mutableAccumulator, POJO input) {
>> mutableAccumulator.num_requests_running_sum++;
>> return mutableAccumulator;
>> }
>>
>> @Override
>> public TotalRequestsAccumulator 
>> 

Re: Pipeline Processing Time

2020-06-01 Thread Luke Cwik
You can configure KafkaIO to use some data from the record as the elements
timestamp. See the KafkaIO javadoc around the TimestampPolicy[1], the
default is current processing time.
You can access the timestamp of the element by adding
"org.joda.time.Instant timestamp" as a parameter to your @ProcessElement,
see this javadoc for additional details[2]. You could then compute now() -
timestamp to calculate processing time.

1:
https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/io/kafka/TimestampPolicy.html
2:
https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html

On Mon, Jun 1, 2020 at 2:00 PM Talat Uyarer 
wrote:

> Sorry for the late response. Where does the beam set that timestamp field
> on element ? Is it set whenever KafkaIO reads that element ?
>
And also I have a windowing function on my pipeline. Does the timestamp
> field change for any kind of operation ? On pipeline I have the
> following steps: KafkaIO -> Format Conversion Pardo -> SQL Filter ->
> Windowing Step -> Custom Sink. If timestamp set in KafkaIO, Can I see
> process time by now() - timestamp in Custom Sink ?
>
>
Thanks
>
> On Thu, May 28, 2020 at 2:07 PM Luke Cwik  wrote:
>
>> Dataflow provides msec counters for each transform that executes. You
>> should be able to get them from stackdriver and see them from the Dataflow
>> UI.
>>
>> You need to keep track of the timestamp of the element as it flows
>> through the system as part of data that goes alongside the element. You can
>> use the element's timestamp[1] if that makes sense (it might not if you
>> intend to use a timestamp that is from the kafka record itself and the
>> record's timestamp isn't the same as the ingestion timestamp). Unless you
>> are writing your own sink, the sink won't track the processing time at all
>> so you'll need to add a ParDo that goes right before it that writes the
>> timing information to wherever you want (a counter, your own metrics
>> database, logs, ...).
>>
>> 1:
>> https://github.com/apache/beam/blob/018e889829e300ab9f321da7e0010ff0011a73b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L257
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_blob_018e889829e300ab9f321da7e0010ff0011a73b1_sdks_java_core_src_main_java_org_apache_beam_sdk_transforms_DoFn.java-23L257=DwMFaQ=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g=1202mTv7BP1KzcBJECS98dr7u5riw0NHdl8rT8I6Ego=cPdnrK4r-tVd0iAO6j7eAAbDPISOdazEYBrPoC9cQOo=>
>>
>>
>> On Thu, May 28, 2020 at 1:12 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Yes I am trying to track how long it takes for a single element to be
>>> ingested into the pipeline until it is output somewhere.
>>>
>>> My pipeline is unbounded. I am using KafkaIO. I did not think about CPU
>>> time. if there is a way to track it too, it would be useful to improve my
>>> metrics.
>>>
>>> On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:
>>>
>>>> What do you mean by processing time?
>>>>
>>>> Are you trying to track how long it takes for a single element to be
>>>> ingested into the pipeline until it is output somewhere?
>>>> Do you have a bounded pipeline and want to know how long all the
>>>> processing takes?
>>>> Do you care about how much CPU time is being consumed in aggregate for
>>>> all the processing that your pipeline is doing?
>>>>
>>>>
>>>> On Thu, May 28, 2020 at 11:01 AM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> I am using Dataflow Runner. The pipeline read from kafkaIO and send
>>>>> Http. I could not find any metadata field on the element to set first read
>>>>> time.
>>>>>
>>>>> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver 
>>>>> wrote:
>>>>>
>>>>>> Which runner are you using?
>>>>>>
>>>>>> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer <
>>>>>> tuya...@paloaltonetworks.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have a pipeline which has 5 steps. What is the best way to measure
>>>>>>> processing time for my pipeline?
>>>>>>>
>>>>>>> Thnaks
>>>>>>>
>>>>>>


Re: Pipeline Processing Time

2020-05-28 Thread Luke Cwik
Dataflow provides msec counters for each transform that executes. You
should be able to get them from stackdriver and see them from the Dataflow
UI.

You need to keep track of the timestamp of the element as it flows through
the system as part of data that goes alongside the element. You can use the
element's timestamp[1] if that makes sense (it might not if you intend to
use a timestamp that is from the kafka record itself and the record's
timestamp isn't the same as the ingestion timestamp). Unless you are
writing your own sink, the sink won't track the processing time at all so
you'll need to add a ParDo that goes right before it that writes the timing
information to wherever you want (a counter, your own metrics database,
logs, ...).

1:
https://github.com/apache/beam/blob/018e889829e300ab9f321da7e0010ff0011a73b1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L257


On Thu, May 28, 2020 at 1:12 PM Talat Uyarer 
wrote:

> Yes I am trying to track how long it takes for a single element to be
> ingested into the pipeline until it is output somewhere.
>
> My pipeline is unbounded. I am using KafkaIO. I did not think about CPU
> time. if there is a way to track it too, it would be useful to improve my
> metrics.
>
> On Thu, May 28, 2020 at 12:52 PM Luke Cwik  wrote:
>
>> What do you mean by processing time?
>>
>> Are you trying to track how long it takes for a single element to be
>> ingested into the pipeline until it is output somewhere?
>> Do you have a bounded pipeline and want to know how long all the
>> processing takes?
>> Do you care about how much CPU time is being consumed in aggregate for
>> all the processing that your pipeline is doing?
>>
>>
>> On Thu, May 28, 2020 at 11:01 AM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> I am using Dataflow Runner. The pipeline read from kafkaIO and send
>>> Http. I could not find any metadata field on the element to set first read
>>> time.
>>>
>>> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver 
>>> wrote:
>>>
>>>> Which runner are you using?
>>>>
>>>> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer <
>>>> tuya...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a pipeline which has 5 steps. What is the best way to measure
>>>>> processing time for my pipeline?
>>>>>
>>>>> Thnaks
>>>>>
>>>>


Re: Pipeline Processing Time

2020-05-28 Thread Luke Cwik
What do you mean by processing time?

Are you trying to track how long it takes for a single element to be
ingested into the pipeline until it is output somewhere?
Do you have a bounded pipeline and want to know how long all the processing
takes?
Do you care about how much CPU time is being consumed in aggregate for all
the processing that your pipeline is doing?


On Thu, May 28, 2020 at 11:01 AM Talat Uyarer 
wrote:

> I am using Dataflow Runner. The pipeline read from kafkaIO and send Http.
> I could not find any metadata field on the element to set first read time.
>
> On Thu, May 28, 2020 at 10:44 AM Kyle Weaver  wrote:
>
>> Which runner are you using?
>>
>> On Thu, May 28, 2020 at 1:43 PM Talat Uyarer <
>> tuya...@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I have a pipeline which has 5 steps. What is the best way to measure
>>> processing time for my pipeline?
>>>
>>> Thnaks
>>>
>>


Re: Cross-Language pipeline fails with PROCESS SDK Harness

2020-05-28 Thread Luke Cwik
I haven't had any changes since #11670 in this space so nothing is missing
from the release.

Also, the GreedyPipelineFuser has not been updated to support XLang as it
has some baked-in assumptions around flatten[1] and likely other issues. It
has worked because the simple examples we have run haven't hit these kinds
of cases.

The source seems to be the CreateSource and should be wrapped with the
BoundedSourceSDFWrapper and have been expanded by the runner to the
appropriate SDF steps. Unfortunately I don't have the time to try to
produce a repro since the error is cut off before we see the details of the
transform/pipeline that was being executed. Creating a JIRA with a link to
a git branch and gradle command that builds/launches the pipeline that
produces the error would help the Beam community to investigate further.

1:
https://issues.apache.org/jira/browse/BEAM-10016?focusedCommentId=17117378=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17117378

On Thu, May 28, 2020 at 11:50 AM Chamikara Jayalath 
wrote:

> This might have to do with https://github.com/apache/beam/pull/11670.
> +Lukasz Cwik  was there a subsequent fix that was not
> included in the release ?
>
> On Thu, May 28, 2020 at 10:29 AM Kyle Weaver  wrote:
>
>> What source are you using?
>>
>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline
>>> with an external Python transform) with a PROCESS SDK Harness and Spark
>>> Portable Runner but it fails.
>>> To do that I have a running Spark Runner Job Server (Spark local) and
>>> standalone Expansion Service (Python) which contains a code of my Python
>>> transform that should be called from main Java pipeline.
>>>
>>> Once job has been submitted on Job Server and started running, it fails
>>> with this error:
>>>
>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker:
>>> Invoking job
>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>> 20/05/28 18:55:12 INFO
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>>> job invocation
>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>> 20/05/28 18:55:12 ERROR
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>> during job invocation
>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>
>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>> beam:transform:read:v1 primitives, but
>>> transform Create.Values/Read(CreateSource) executes in environment
>>> Optional[urn: "beam:env:docker:v1"*payload:
>>> "\n\033apache/beam_java_sdk:2.21.0"
>>> capabilities: "beam:coder:bytes:v1”
>>> ….
>>>
>>>
>>> Some code snippets of my pipeline that can be helpful.
>>>
>>> *Java transform:*
>>>
>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>
>>> @Override
>>> public PCollection> expand(PCollection input) {
>>>   PCollection> output =
>>>   input.apply(
>>>   "ExternalGenreClassifier",
>>>   External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>   .>withOutputType());
>>>   return output;
>>> }
>>>
>>>
>>> *expansion_service.py*
>>>
>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>> class GenreClassifier(ptransform.PTransform):
>>> def __init__(self):
>>> super(GenreClassifier, self).__init__()
>>>
>>> def expand(self, pcoll):
>>> return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>
>>> def to_runner_api_parameter(self, unused_context):
>>> return 'ml:genreclassifier:python:v1', None
>>>
>>> @staticmethod
>>> def from_runner_api_parameter(unused_ptransform, unused_parameter, 
>>> unused_context):
>>> return GenreClassifier()
>>>
>>>
>>> def main(unused_argv):
>>> ...
>>> server = grpc.server(UnboundedThreadPoolExecutor())
>>> beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>> expansion_service.ExpansionServiceServicer(
>>> PipelineOptions.from_dictionary({
>>> 'environment_type': 'PROCESS',
>>> 'environment_config': *'{"command": 
>>> **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>> 'sdk_location': 'container',
>>> })
>>> ), server
>>> )
>>> server.add_insecure_port('localhost:{}'.format(options.port))
>>> server.start()
>>>
>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to
>>> fix it?
>>>
>>>
>>>


Re: writing new IO with Maven dependencies

2020-05-28 Thread Luke Cwik
+dev 

On Thu, May 28, 2020 at 11:55 AM Ken Barr  wrote:

> I am currently developing an IO that I would like to eventually submit to
> Apache Beam project.  The IO itself is Apache2.0 licensed.
> Does every chained dependency I use need to be opensource?
>

The transitive dependency tree must have licenses from ASFs approved
license list. See https://www.apache.org/legal/resolved.html for all the
details.


> If yes, how is this usually proven?
>

Typically the reviewer will ask you to provide the dependency tree and the
licenses of those dependencies if the reviewer doesn't do this themselves
or recognize the dependency itself. The reviewer will validate any
information that you provide.


> Is it enough that only Maven dependencies are used?
>
No.


Re: Using Self signed root ca for https connection in eleasticsearchIO

2020-05-18 Thread Luke Cwik
You don't apply it as part of a PTransform. The class is loaded and
executed dynamically using the JVM's ServiceLoader[1]. So you need to make
sure that the class is on the workers classpath and that your class appears
in one of the META-INF/services files under the JvmInitializer service
(typically contained in the jar file having your class). This isn't a great
example since it is in a test[2] but you should be able to use the same
test logic to try out whether the class would be loaded dynamically.

1: https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html
2:
https://github.com/apache/beam/blob/7c80ecb8c354575e4332f0f1731f1b5a3f0c4362/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/JvmInitializersTest.java#L41

On Sun, May 17, 2020 at 2:59 PM Mohil Khare  wrote:

> Hello Luke,
>
> Thanks for your reply and I apologize for the late reply.
> Well, I already tried using JvmInitializer and for some reason it didn't
> work for me. Quite possible that I was not using it correctly. Do you have
> any code snippets that show how we can use it in a Ptransformation.
>
> My elasticsearch PTransform look like following:
>
> class WriteToElasticSearch extends PTransform, PDone> {
> //private static final Logger logger = 
> LoggerFactory.getLogger(WriteAppAccessLogToElasticSearch.class);
> private final String[] elasticsearchEndPoint;
> private final String username;
> private final String password;
>
> WriteToElasticSearch(String[] elasticsearchEndPoint, String username, 
> String password) {
> this.elasticsearchEndPoint = elasticsearchEndPoint;
> this.username = username;
> this.password = password;
>
> }
> @Override
> public PDone expand(PCollection input) {
> input
> .apply("Convert_PCollection to PCollection", new 
> MyPojoToString())
> .apply("Global_Window_Trigger_Write_With_Every_Element",
> Window.into(new GlobalWindows())
> 
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
> .discardingFiredPanes()
> )
> .apply("Write_To_Elastic_Search", 
> ElasticsearchIO.write().withConnectionConfiguration(
> 
> ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint, 
> "index_write", "_doc").withUsername(username).withPassword(password))
> );
> return PDone.in(input.getPipeline());
> }
>
>
> Thanks and Regards
> Mohil
>
> On Thu, Apr 9, 2020 at 2:02 PM Luke Cwik  wrote:
>
>> You should be able to use a JvmInitializer[1] to set any system
>> properties/configure the JVM trust store. Just make sure it's properly set
>> up in your META-INF/services.
>>
>> This is supported by Dataflow and all PortableRunners that use a separate
>> process/container for the worker.
>>
>> 1:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
>>
>> On Thu, Apr 9, 2020 at 10:02 AM Mohil Khare  wrote:
>>
>>> Hi Kenneth,
>>>
>>> Thanks for your reply. You are right, I also believe that this has more
>>> to do with Dataflow than Elasticsearch. I don't think the problem is in
>>> classpath or beam unable to find file in classpath. The problem is how to.
>>> set worker VM's keystore and truststore with self signed root ca. Usually
>>> they contain all trusted root CAs like letsencrypt, symantec etc. KafkaIO
>>> provide that via withConsumerFactorFn(loadKafkaConfig) where you can do
>>> something like following:
>>>
>>> private void loadKafkaConfig(Map config) {
>>>
>>> readJKSFileFromGCS(this.gcsTruststorePath, 
>>> "/tmp/kafka.client.truststore.jks");
>>> readJKSFileFromGCS(this.gcsKeystorePath, 
>>> "/tmp/kafka.client.keystore.jks");
>>>
>>>
>>> 
>>> config.put("ssl.truststore.location","/tmp/kafka.client.truststore.jks");
>>> config.put("ssl.truststore.password","clientsecret");
>>> config.put("ssl.keystore.location","/tmp/kafka.client.keystore.jks");
>>> config.put("ssl.keystore.password","clientsecret");
>>> config.put("ssl.key.password","clientsecret");
>>> }
>>>
>>>
>>> I was wondering if ElasticIO can also provide similar support where we
>>> can provide our self signed root ca.
>>>
>>> Thanks and Regards
>>>

  1   2   >