Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-24 Thread jincheng sun
Hi,

Thanks for your comments in doc, I have add Approach 3 which you
mentioned! @Luke

For now, we should do a decision for Approach 3 and Approach 1. Detail can
be found in doc [1]

Welcome anyone's feedback :)

Regards,
Jincheng

[1]
https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing

jincheng sun  于2019年10月25日周五 上午10:40写道:

> Hi,
>
> Functionally capable of `abort`, but it will be called at the end of
> operator. So, I prefer `dispose` semantics. i.e., all normal logic has been
> executed.
>
> Best,
> Jincheng
>
> Harsh Vardhan  于2019年10月23日周三 上午12:14写道:
>
>> Would approach 1 be akin to abort semantics?
>>
>> On Mon, Oct 21, 2019 at 8:01 PM jincheng sun 
>> wrote:
>>
>>> Hi Luke,
>>>
>>> Thanks a lot for your reply. Since it allows to share one SDK harness
>>> between multiple executable stages, the control service termination may
>>> occur much later than the completion of an executable stage. This is the
>>> main reason I prefer runners to control the teardown of DoFns.
>>>
>>> Regarding to "SDK harnesses can terminate instances any time they want
>>> and start new instances anytime as well.", personally I think it's not
>>> conflict with the proposed Approach 1 as the SDK harness could decide what
>>> to do when receiving the teardown request. It could do nothing if the DoFns
>>> has already been teared down and could also tear down the DoFns if needed.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Jincheng
>>>
>>> Luke Cwik  于2019年10月22日周二 上午2:05写道:
>>>
 Approach 2 is currently the suggested approach[1] for DoFn's to
 shutdown.
 Note that SDK harnesses can terminate instances any time they want and
 start new instances anytime as well.

 Why do you want to expose this logic so that Runners could control it?

 1:
 https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#

 On Mon, Oct 21, 2019 at 4:27 AM jincheng sun 
 wrote:

> Hi,
> I found that in `SdkHarness` do not  stop the `SdkWorker` when
> finish.  We should add the logic for stop the `SdkWorker` in `SdkHarness`.
> More detail can be found [1].
>
> There are two approaches to solve this issue:
>
> Approach 1:  We can add a Fn API for teardown purpose and the runner
> will teardown a specific bundle descriptor via this teardown Fn API during
> disposing.
> Approach 2: The control service termination could be seen as a signal
> and once SDK harness receives this signal, the teardown of the bundle
> descriptor will be performed.
>
> More detail can be found in [2].
>
> As the Approach 2, SDK harness could be shared between multiple
> executable stages. The control service termination only occurs when all 
> the
> executable stages sharing the same SDK harness finished. This means that
> the teardown of DoFns may not be executed immediately after an executable
> stage is finished.
>
> So, I prefer Approach 1. Welcome any feedback :)
>
> Best,
> Jincheng
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py
> [2]
> https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing
>
 --
>>
>> Got feedback? go/harsh-feedback 
>>
>


Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-24 Thread jincheng sun
Hi,

Functionally capable of `abort`, but it will be called at the end of
operator. So, I prefer `dispose` semantics. i.e., all normal logic has been
executed.

Best,
Jincheng

Harsh Vardhan  于2019年10月23日周三 上午12:14写道:

> Would approach 1 be akin to abort semantics?
>
> On Mon, Oct 21, 2019 at 8:01 PM jincheng sun 
> wrote:
>
>> Hi Luke,
>>
>> Thanks a lot for your reply. Since it allows to share one SDK harness
>> between multiple executable stages, the control service termination may
>> occur much later than the completion of an executable stage. This is the
>> main reason I prefer runners to control the teardown of DoFns.
>>
>> Regarding to "SDK harnesses can terminate instances any time they want
>> and start new instances anytime as well.", personally I think it's not
>> conflict with the proposed Approach 1 as the SDK harness could decide what
>> to do when receiving the teardown request. It could do nothing if the DoFns
>> has already been teared down and could also tear down the DoFns if needed.
>>
>> What do you think?
>>
>> Best,
>> Jincheng
>>
>> Luke Cwik  于2019年10月22日周二 上午2:05写道:
>>
>>> Approach 2 is currently the suggested approach[1] for DoFn's to shutdown.
>>> Note that SDK harnesses can terminate instances any time they want and
>>> start new instances anytime as well.
>>>
>>> Why do you want to expose this logic so that Runners could control it?
>>>
>>> 1:
>>> https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#
>>>
>>> On Mon, Oct 21, 2019 at 4:27 AM jincheng sun 
>>> wrote:
>>>
 Hi,
 I found that in `SdkHarness` do not  stop the `SdkWorker` when finish.
 We should add the logic for stop the `SdkWorker` in `SdkHarness`.  More
 detail can be found [1].

 There are two approaches to solve this issue:

 Approach 1:  We can add a Fn API for teardown purpose and the runner
 will teardown a specific bundle descriptor via this teardown Fn API during
 disposing.
 Approach 2: The control service termination could be seen as a signal
 and once SDK harness receives this signal, the teardown of the bundle
 descriptor will be performed.

 More detail can be found in [2].

 As the Approach 2, SDK harness could be shared between multiple
 executable stages. The control service termination only occurs when all the
 executable stages sharing the same SDK harness finished. This means that
 the teardown of DoFns may not be executed immediately after an executable
 stage is finished.

 So, I prefer Approach 1. Welcome any feedback :)

 Best,
 Jincheng

 [1]
 https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py
 [2]
 https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing

>>> --
>
> Got feedback? go/harsh-feedback 
>


Re: DynamoDBIO related issue

2019-10-24 Thread Cam Mach
Hi Pradeep,

What is your enhancement? Can you create a ticket and describe it?

Thanks,
Cam



On Thu, Oct 24, 2019 at 1:58 PM Pradeep Bhosale <
bhosale.pradeep1...@gmail.com> wrote:

> Hi,
>
> This is Pradeep. I am using DynamoDB IO to write data to dynamo DB.
> I would like to report one enhancement.
>
> Please let me know how can I achieve that.
> I don't have *create issue* access on beam JIRA.
>
> https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8368?filter=allopenissues
>
>


Multiple Outputs from Expand in Python

2019-10-24 Thread Sam Rohde
Hey All,

I'm trying to implement an expand override with multiple output
PCollections. The kicker is that I want to insert a new transform for each
output PCollection. How can I do this?

Regards,
Sam


Re: Python Precommit duration pushing 2 hours

2019-10-24 Thread Ahmet Altay
Ack. Separating precommit ITs to a different suite sounds good. Anyone is
interested in doing that?

On Thu, Oct 24, 2019 at 2:41 PM Valentyn Tymofieiev 
wrote:

> This should not increase the queue time substantially, since precommit ITs
> are running *sequentially* with precommit tests, unlike multiple
> precommit tests which run in parallel to each other.
>
> The precommit ITs we run are batch and streaming wordcount tests on Py2
> and one Py3 version, so it's not a lot of tests.
>
> On Thu, Oct 24, 2019 at 1:07 PM Ahmet Altay  wrote:
>
>> +1 to separating ITs from precommit. Downside would be, when Chad tried
>> to do something similar [1] it was noted that the total time to run all
>> precommit tests would increase and also potentially increase the queue time.
>>
>> Another alternative, we could run a smaller set of IT tests in precommits
>> and run the whole suite as part of post commit tests.
>>
>> [1] https://github.com/apache/beam/pull/9642
>>
>> On Thu, Oct 24, 2019 at 12:15 PM Valentyn Tymofieiev 
>> wrote:
>>
>>> One improvement could be move to Precommit IT tests into a separate
>>> suite from precommit tests, and run it in parallel.
>>>
>>> On Thu, Oct 24, 2019 at 11:41 AM Brian Hulette 
>>> wrote:
>>>
 Python Precommits are taking quite a while now [1]. Just visually it
 looks like the average length is 1.5h or so, but it spikes up to 2h. I've
 had several precommit runs get aborted due to the 2 hour limit.

 It looks like there was a spike up above 1h back on 9/6 and the
 duration has been steadily rising since then. Is there anything we can do
 about this?

 Brian

 [1]
 http://104.154.241.245/d/_TNndF2iz/pre-commit-test-latency?orgId=1=now-90d=now=4

>>>


Re: Python Precommit duration pushing 2 hours

2019-10-24 Thread Valentyn Tymofieiev
This should not increase the queue time substantially, since precommit ITs
are running *sequentially* with precommit tests, unlike multiple precommit
tests which run in parallel to each other.

The precommit ITs we run are batch and streaming wordcount tests on Py2 and
one Py3 version, so it's not a lot of tests.

On Thu, Oct 24, 2019 at 1:07 PM Ahmet Altay  wrote:

> +1 to separating ITs from precommit. Downside would be, when Chad tried to
> do something similar [1] it was noted that the total time to run all
> precommit tests would increase and also potentially increase the queue time.
>
> Another alternative, we could run a smaller set of IT tests in precommits
> and run the whole suite as part of post commit tests.
>
> [1] https://github.com/apache/beam/pull/9642
>
> On Thu, Oct 24, 2019 at 12:15 PM Valentyn Tymofieiev 
> wrote:
>
>> One improvement could be move to Precommit IT tests into a separate suite
>> from precommit tests, and run it in parallel.
>>
>> On Thu, Oct 24, 2019 at 11:41 AM Brian Hulette 
>> wrote:
>>
>>> Python Precommits are taking quite a while now [1]. Just visually it
>>> looks like the average length is 1.5h or so, but it spikes up to 2h. I've
>>> had several precommit runs get aborted due to the 2 hour limit.
>>>
>>> It looks like there was a spike up above 1h back on 9/6 and the duration
>>> has been steadily rising since then. Is there anything we can do about this?
>>>
>>> Brian
>>>
>>> [1]
>>> http://104.154.241.245/d/_TNndF2iz/pre-commit-test-latency?orgId=1=now-90d=now=4
>>>
>>


Re: Beam on Flink with "active" k8s support

2019-10-24 Thread Thomas Weise
Hi Chad,

Thanks for bringing up this discussion. I think for most of it the Flink
list would be the better suited place, but given that there are more and
more Beam users interested to deploy on Beam on Flink on k8s, I will leave
the placement to you ;-)

As for the differences between FlinkK8sOperator and the proposals that you
linked, the motivation section in [2] covers that a bit. With
FlinkK8sOperator the deployment indeed looks more or less like "any other
k8s deployment". Within the Lyft infrastructure it is also automated
through the CI and there is a bit of tooling for development so it is
usually not necessary to work with kubectl. Flink cluster deployment
specifics and other details are encapsulated in the operator. This was a
deliberate choice based on our experience of previously running Flink in a
way that exposes too many deployment details to the user. FlinkK8sOperator
instead provides the control plane that should allow users to focus on
their application (vs. how it is being deployed). An important
consideration are upgrades, so an "application" is really a sequence of
jobs. Please see state machine:

https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md

Since I'm not interested to work with flink command line to manage
production environments, making Flink on k8s look like Flink on Yarn or any
other Flink would be a non-goal.

The more interesting part is the active resource management; I suspect you
were thinking of that? Currently all TM pods have the same size and they
are allocated upfront. Rescaling a Flink job requires the update management
FlinkK8sOperator provides. The job needs to be stopped with a savepoint and
restarted with changed parallelism (because that's how Flink works at its
core). That would not change, even if the job was to (actively) manage the
k8s resources. Dynamic scaling with the operator would mean that a
parallelism configuration change would be triggered automatically.

A consequence of resources being allocated by Flink on demand would be that
different pod sizes would become possible, based on the exact shape of
topology. I think there is a proposal to support such resource allocation
in Flink in the future. Note that this is not necessarily an advantage
though. If the resource requests cannot be filled fast enough, you have
more downtime. That's why with the operator, we only stop the old job when
the new cluster is already deployed and ready to run the replace job.

HTH,
Thomas



On Sat, Oct 19, 2019 at 11:55 AM Chad Dombrova  wrote:

> Hi all,
> I've been following the Jira issue on Flink "active" k8s support
> (autoscaling based on task resource requirements, IIUC) and there has been
> a lot of activity there lately.  There are two design docs [2][3] from
> different teams and it seems like some good collaboration is going on to
> reconcile the differences and get the feature implemented.
>
> At the Beam Summit I saw the great presentations by Thomas and Micah on
> the work that Lyft has been doing to run Flink and Beam on k8s.  I'm
> curious if these various features and approaches can be made to work with
> each other, and if so, what it would take to do so.
>
> thanks,
> -chad
>
> [1] https://issues.apache.org/jira/browse/FLINK-9953
> [2]
> https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit
> [3]
> https://docs.google.com/document/d/1Or1hmDXABk1Q3ToITMi0HoZyHLs2nZ6gk7E0y2sEe28/edit#heading=h.ubqno6n4vwrl
>
>


DynamoDBIO related issue

2019-10-24 Thread Pradeep Bhosale
Hi,

This is Pradeep. I am using DynamoDB IO to write data to dynamo DB.
I would like to report one enhancement.

Please let me know how can I achieve that.
I don't have *create issue* access on beam JIRA.
https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8368?filter=allopenissues


Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-24 Thread Harsh Vardhan
Thanks, +1 to adding support for streaming on Interactive Beam (+David Yan
)


On Thu, Oct 24, 2019 at 1:45 PM Hai Lu  wrote:

> Hi Robert,
>
> We're trying out iBeam at LinkedIn for Python. As Igor mentioned, there
> seems to be some inconsistency in the behavior of interactive beam. We can
> suggest some fixes from our end but we would need some support from the
> community.
>
> Also, is there a plan to support iBeam for streaming mode? We're
> interested in that use case as well.
>
> Thanks,
> Hai
>
> On Mon, Oct 21, 2019 at 4:45 PM Robert Bradshaw 
> wrote:
>
>> Thanks for trying this out. Yes, this is definitely something that
>> should be supported (and tested).
>>
>> On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
>> >
>> > Hi everyone,
>> >
>> > The interactive beam example using the DirectRunner fails after
>> execution of the last cell. The recursion limit is exceeded during the
>> calculation of the cache label because of a circular reference in the
>> PipelineInfo object.
>> >
>> > The constructor for the PipelineInfo class creates a mapping from each
>> pcollection to the transforms that produce and consume it. The issue arises
>> when there exists a transform that is both a producer and a consumer for
>> the same pcollection. This occurs when a transform's expand method returns
>> the same pcoll object that's passed into it. The specific transform causing
>> the failure of the example is MaybeReshuffle, which is used in the Create
>> transform. Replacing "return pcoll" with "return pcoll | Map(lambda x: x)"
>> seems to fix the problem.
>> >
>> > A workaround for this issue on the interactive beam side would be
>> fairly simple, but it seems to me that there should be more validation of
>> pipelines to prevent the use of transforms that return the same pcoll
>> that's passed in, or at least a mention of this in the transform style
>> guide. My understanding is that pcollections are produced by a single
>> transform (they even have a field called "producer" that references only
>> one transform). If that's the case then that property of pcollections
>> should be enforced.
>> >
>> > I made ticket BEAM-8451 to track this issue.
>> >
>> > I'm still new to beam so I apologize if I'm fundamentally
>> misunderstanding something. I'm not exactly sure what the next step should
>> be and would appreciate some recommendations. I can submit a PR to solve
>> the immediate problem of the failing example but the underlying problem
>> should also be addressed at some point. I also apologize if people are
>> already aware of this problem.
>> >
>> > Thank You!
>> > Igor Durovic
>>
>


Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-24 Thread Hai Lu
Hi Robert,

We're trying out iBeam at LinkedIn for Python. As Igor mentioned, there
seems to be some inconsistency in the behavior of interactive beam. We can
suggest some fixes from our end but we would need some support from the
community.

Also, is there a plan to support iBeam for streaming mode? We're interested
in that use case as well.

Thanks,
Hai

On Mon, Oct 21, 2019 at 4:45 PM Robert Bradshaw  wrote:

> Thanks for trying this out. Yes, this is definitely something that
> should be supported (and tested).
>
> On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
> >
> > Hi everyone,
> >
> > The interactive beam example using the DirectRunner fails after
> execution of the last cell. The recursion limit is exceeded during the
> calculation of the cache label because of a circular reference in the
> PipelineInfo object.
> >
> > The constructor for the PipelineInfo class creates a mapping from each
> pcollection to the transforms that produce and consume it. The issue arises
> when there exists a transform that is both a producer and a consumer for
> the same pcollection. This occurs when a transform's expand method returns
> the same pcoll object that's passed into it. The specific transform causing
> the failure of the example is MaybeReshuffle, which is used in the Create
> transform. Replacing "return pcoll" with "return pcoll | Map(lambda x: x)"
> seems to fix the problem.
> >
> > A workaround for this issue on the interactive beam side would be fairly
> simple, but it seems to me that there should be more validation of
> pipelines to prevent the use of transforms that return the same pcoll
> that's passed in, or at least a mention of this in the transform style
> guide. My understanding is that pcollections are produced by a single
> transform (they even have a field called "producer" that references only
> one transform). If that's the case then that property of pcollections
> should be enforced.
> >
> > I made ticket BEAM-8451 to track this issue.
> >
> > I'm still new to beam so I apologize if I'm fundamentally
> misunderstanding something. I'm not exactly sure what the next step should
> be and would appreciate some recommendations. I can submit a PR to solve
> the immediate problem of the failing example but the underlying problem
> should also be addressed at some point. I also apologize if people are
> already aware of this problem.
> >
> > Thank You!
> > Igor Durovic
>


Re: [discuss] How we support our users on Slack / Mailing list / StackOverflow

2019-10-24 Thread Pablo Estrada
Hi all,
I've a pull request[1] to try to improve how the Beam site guides people to
these channels. Would someone take a look?

The PR encourages users to use SO and user@. Depending on others' opinion,
we can also mention slack.
The PR also attempts to improve the bottom links on the website for users
looking for Events and/or Support.

[1] - https://github.com/apache/beam/pull/9875

On Thu, Sep 19, 2019 at 8:28 AM Ismaël Mejía  wrote:

> Sorry for late answer. The issue here is that once we have a
> communication channel, users expect
> answers on it. Python SDK is getting momentum and we need to serve users
> where
> they are (as mentioned by others above).
>
> One strong advantage of 'real-time' communication (Slack/IRC) is that it is
> better suited for collaboration, and to create community bonds, think for
> example of how many people who answered a question you were looking for at
> stackoverflow you can remember by their 'name', versus the people with
> whom you
> have interacted in a short conversation in an IRC-like channel. I mention
> this
> because this is a way to make users welcomed and many times a first step
> towards
> contribution (for example the 'would you be willing to add this to the docs
> case').
>
> StackOverflow is probably the most 'scalable' system because of many
> aspects
> like being indexed in a better way by search engines helping future users
> to
> find answers quickly, but it is also not perfect, the reputation system is
> basically elitist against casual people answering questions. In any case
> there
> is value in encouraging moving some answers from Slack to SO, but there is
> also
> value in improving our own website docs so this should probably be done
> case by
> case.
>
> A first approach is probably to document (and recommend) to users that if
> they
> don't get their questions answered in slack to better ask in SO or the user
> mailing list.
>
> I personally think there is value in getting more people involved in
> 'real-time'
> communications. Of course this is probably not for everyone, I understand
> that
> people may not want to do this to avoid being interrupted or for other
> reasons,
> but this is a trade-off to pay not only to help people but eventually to
> grow the community as in the go-lang case Robert mentioned so it is
> probably
> worth considering.
>
> On Wed, Sep 11, 2019 at 3:27 AM Robert Burke  wrote:
> >
> > For the Go SDK, emailing the dev list or asking on Slack are probably
> the best ways to get an answer from me. I'm not in the habit to search for
> open Go SDK questions on stack overflow right now, but will chip in if
> they're pointed out to me
> >
> > As Alexey mentions, Slack largely works for quick back and forths with
> community members, especially if both folks are awake at the same time. Eg.
> I've been handling a few questions there, and helping the user in question
> even get a few quick fix PRs in, making the SDK better for everyone.
> > On the other hand, I can be more responsive on beam-go because it's low
> enough traffic I can be notified of every question/response. I look forward
> to when there's enough traffic there I can turn that off. :D
> >
> >
> > On Tue, Sep 10, 2019, 4:45 PM Alexey Romanenko 
> wrote:
> >>
> >> Pablo, thank you for raising this question.
> >>
> >> I can’t say for Python, but as a someone, who tries to keep an eye on
> Java SDK related questions on ML/Slack/SO for a while, I’d say that Slack
> is not very effective for this.
> >> There are several reasons for that:
> >> - People tend to expect a quick feedback on Slack which is not
> happening all the time, especially, for not evident questions where you
> need some time to provide an answer. Also, timezones difference play its
> role in terms of reaction time.
> >> - Discussions are not always happened inside Slack threads, so it could
> be messed up with the messages of other questions/topics and it becomes
> difficult to follow.
> >> - It’s not so easy to search for similar issues and provide quick link
> with already answered question.
> >>
> >> So, I’d say that Slack is perfect to discuss quick and urgent questions
> but not sure it should be placed on the first place as a users support
> thing. IMHO, we need to redirect users to user@ or SO for that (up to
> them to choose). Though, the more important thing is to regulalrly keep
> track of non answered questions there and do our best to minimise this
> number.
> >>
> >>
> >> On 9 Sep 2019, at 11:38, Kyle Weaver  wrote:
> >>
> >> I pinned a message to #beam reminding people of the user@, but pinned
> messages aren't immediately visible. We might be better off editing the
> topic, which always appears at the top of the channel, to include
> https://beam.apache.org/community/contact-us/ or links to user@ and SO.
> We should also add the same topic to the #beam-java and #beam-python
> channels, which currently don't have any topic.
> >>
> >> Kyle Weaver | Software Engineer | github.com/ibzib 

Re: Python Precommit duration pushing 2 hours

2019-10-24 Thread Ahmet Altay
+1 to separating ITs from precommit. Downside would be, when Chad tried to
do something similar [1] it was noted that the total time to run all
precommit tests would increase and also potentially increase the queue time.

Another alternative, we could run a smaller set of IT tests in precommits
and run the whole suite as part of post commit tests.

[1] https://github.com/apache/beam/pull/9642

On Thu, Oct 24, 2019 at 12:15 PM Valentyn Tymofieiev 
wrote:

> One improvement could be move to Precommit IT tests into a separate suite
> from precommit tests, and run it in parallel.
>
> On Thu, Oct 24, 2019 at 11:41 AM Brian Hulette 
> wrote:
>
>> Python Precommits are taking quite a while now [1]. Just visually it
>> looks like the average length is 1.5h or so, but it spikes up to 2h. I've
>> had several precommit runs get aborted due to the 2 hour limit.
>>
>> It looks like there was a spike up above 1h back on 9/6 and the duration
>> has been steadily rising since then. Is there anything we can do about this?
>>
>> Brian
>>
>> [1]
>> http://104.154.241.245/d/_TNndF2iz/pre-commit-test-latency?orgId=1=now-90d=now=4
>>
>


Re: Python Precommit duration pushing 2 hours

2019-10-24 Thread Valentyn Tymofieiev
One improvement could be move to Precommit IT tests into a separate suite
from precommit tests, and run it in parallel.

On Thu, Oct 24, 2019 at 11:41 AM Brian Hulette  wrote:

> Python Precommits are taking quite a while now [1]. Just visually it looks
> like the average length is 1.5h or so, but it spikes up to 2h. I've had
> several precommit runs get aborted due to the 2 hour limit.
>
> It looks like there was a spike up above 1h back on 9/6 and the duration
> has been steadily rising since then. Is there anything we can do about this?
>
> Brian
>
> [1]
> http://104.154.241.245/d/_TNndF2iz/pre-commit-test-latency?orgId=1=now-90d=now=4
>


Re: Beam 2.17.0 Release Tracking

2019-10-24 Thread Mikhail Gryzykhin
Thank you for updated link Thomas.

UPD:
Snapshot build completed
.


On Thu, Oct 24, 2019 at 11:54 AM Thomas Weise 
wrote:

> Thanks Mikhail!
>
> JIRA issues pointed to a resolved ticket.
>
> This should list the open items:
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20fixVersion%20%3D%202.17.0%20and%20resolution%20is%20EMPTY
>
>
> On Thu, Oct 24, 2019 at 11:16 AM Mikhail Gryzykhin 
> wrote:
>
>> Hello everyone,
>>
>> Bream 2.17.0 release branch is cut
>> . Next steps are to
>> build snapshot and validate branch.
>>
>> Follow on blocking Jira issues
>> 
>>  as
>> well.
>>
>> --Mikhail
>>
>>


Re: Beam 2.17.0 Release Tracking

2019-10-24 Thread Thomas Weise
Thanks Mikhail!

JIRA issues pointed to a resolved ticket.

This should list the open items:
https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20fixVersion%20%3D%202.17.0%20and%20resolution%20is%20EMPTY


On Thu, Oct 24, 2019 at 11:16 AM Mikhail Gryzykhin 
wrote:

> Hello everyone,
>
> Bream 2.17.0 release branch is cut
> . Next steps are to
> build snapshot and validate branch.
>
> Follow on blocking Jira issues
> 
>  as
> well.
>
> --Mikhail
>
>


Beam 2.17.0 Release Tracking

2019-10-24 Thread Mikhail Gryzykhin
Hello everyone,

Bream 2.17.0 release branch is cut
. Next steps are to
build snapshot and validate branch.

Follow on blocking Jira issues

as
well.

--Mikhail


Re: Apache Pulsar connector for Beam

2019-10-24 Thread Pablo Estrada
There's a JIRA issue to track this:
https://issues.apache.org/jira/browse/BEAM-8218

Alex was kind enough to file it. +Alex Van Boxel  : )
Best
-P

On Thu, Oct 24, 2019 at 12:01 AM Taher Koitawala  wrote:

> Hi Reza,
>  Thanks for your reply. However i do not see Pulsar listed in
> there. Should we file a jira?
>
> On Thu, Oct 24, 2019, 12:16 PM Reza Rokni  wrote:
>
>> Hi Taher,
>>
>> You can see the list of current and wip IO's here:
>>
>> https://beam.apache.org/documentation/io/built-in/
>>
>> Cheers
>>
>> Reza
>>
>> On Thu, 24 Oct 2019 at 13:56, Taher Koitawala  wrote:
>>
>>> Hi All,
>>>  Been wanting to know if we have a Pulsar connector for Beam.
>>> Pulsar is another messaging queue like Kafka and I would like to build a
>>> streaming pipeline with Pulsar. Any help would be appreciated..
>>>
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Alexey Romanenko
Jozef, do you have any NPE stacktrace to share?

> On 24 Oct 2019, at 15:26, Jozef Vilcek  wrote:
> 
> Hi,
> 
> I am in a need to read a big-ish data set via JdbcIO. This forced me to bump 
> up memory for my executor (right now using SparkRunner). It seems that JdbcIO 
> has a requirement to fit all data in memory as it is using DoFn to unfold 
> query to list of elements.
> 
> BoundedSource would not face the need to fit result in memory, but JdbcIO is 
> using DoFn. Also, in recent discussion [1] it was suggested that 
> BoudnedSource should not be used as it is obsolete.
> 
> Does anyone faced this issue? What would be the best way to solve it? If DoFn 
> should be kept, then I can only think of splitting the query to ranges and 
> try to find most fitting number of rows to read at once.
> 
> I appreciate any thoughts. 
> 
> [1] 
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource
>  
> 


[DISCUSS] New Beam pipeline diagrams

2019-10-24 Thread Cyrus Maden
Hi all,

Thank you to everyone who reached out to me and/or commented on my proposal
for new Beam pipeline diagrams[1]. I've compiled all of your suggestions
and updated the design accordingly. *Here's a new Google Doc with sample
diagrams[2].* I plan to draft a PR soon that replaces all of the diagrams
in our docs with new ones. In the meantime, please feel free to leave
comments and suggestions on the new doc!

Best,
Cyrus

[1]
https://docs.google.com/document/d/1khf9Bx4XJWsKUD6J1eDcYo_8dL9LBoHDtJpyDjDzOMM/edit?usp=sharing
[2]
https://docs.google.com/document/d/1MvL64o1QmJdzZcPFtkTFFlFM3_HrsZFjshxd1KWEBfg/edit?usp=sharing


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Ryan Skraba
Hello!

If I remember correctly -- the JdbcIO will use *one* DoFn instance to
read all of the rows, but that instance is not required to hold all of
the rows in memory.

The fetch size will, however, read 50K rows at a time by default and
those will all be held in memory in that single worker until they are
emitted.  You can adjust this setting with the setFetchSize(...)
method.

By default, the JdbcIO.Read transform adds a "reshuffle", which will
repartition the records among all of the nodes in the cluster.  This
means that all of the rows need to fit into total available memory of
the cluster (not just that one node), especially if the RDD underneath
the PCollection is reused/persisted.  You can change the persistence
level to "MEMORY_AND_DISK" in this case if you want to spill data to
disk instead of failing your job:
https://github.com/apache/beam/blob/416f62bdd7fa092257921e4835a48094ebe1dda4/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L56

I hope this helps!  Ryan




On Thu, Oct 24, 2019 at 4:26 PM Jean-Baptiste Onofré  wrote:
>
> Hi
>
> JdbcIO is basically a DoFn. So it could load all on a single executor 
> (there's no obvious way to split).
>
> It's what you mean ?
>
> Regards
> JB
>
> Le 24 oct. 2019 15:26, Jozef Vilcek  a écrit :
>
> Hi,
>
> I am in a need to read a big-ish data set via JdbcIO. This forced me to bump 
> up memory for my executor (right now using SparkRunner). It seems that JdbcIO 
> has a requirement to fit all data in memory as it is using DoFn to unfold 
> query to list of elements.
>
> BoundedSource would not face the need to fit result in memory, but JdbcIO is 
> using DoFn. Also, in recent discussion [1] it was suggested that 
> BoudnedSource should not be used as it is obsolete.
>
> Does anyone faced this issue? What would be the best way to solve it? If DoFn 
> should be kept, then I can only think of splitting the query to ranges and 
> try to find most fitting number of rows to read at once.
>
> I appreciate any thoughts.
>
> [1] 
> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource
>
>


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Eugene Kirpichov
Sorry, I just realized I've made a mistake. BoundedSource in some runners
may not have the same "fits in memory" limitation as DoFn's, so in that
sense you're right - if it was done as a BoundedSource, perhaps it would
work better in your case, even if it didn't read things in parallel.

On Thu, Oct 24, 2019 at 8:17 AM Eugene Kirpichov 
wrote:

> Hi Josef,
>
> JdbcIO per se does not require the result set to fit in memory. The issues
> come from the limitations of the context in which it runs:
> - It indeed uses a DoFn to emit results; a DoFn is in general allowed to
> emit an unbounded number of results that doesn't necessarily have to fit in
> memory, but some runners may have this requirement (e.g. Spark probably
> does, Dataflow doesn't, not sure about the others)
> - JdbcIO uses a database cursor provided by the underlying JDBC driver to
> read through the results. Again, depending on the particular JDBC driver,
> the cursor may or may not be able to stream the results without storing all
> of them in memory.
> - The biggest issue, though, is that there's no way to automatically split
> the execution of a JDBC query into several sub-queries whose results
> together are equivalent to the result of the original query. Because of
> this, it is not possible to implement JdbcIO in a way that it would
> *automatically* avoid scanning through the entire result set, because
> scanning through the entire result set sequentially is the only way JDBC
> drivers (and most databases) allow you to access query results. Even if we
> chose to use BoundedSource, we wouldn't be able to implement the split()
> method.
>
> If you need to read query results in parallel, or to circumvent memory
> limitations of a particular runner or JDBC driver, you can use
> JdbcIO.readAll(), and parameterize your query such that passing all the
> parameter values together adds up to the original query you wanted. Most
> likely it would be something like transforming "SELECT * FROM TABLE" to a
> family of queries "SELECT * FROM TABLE WHERE MY_PRIMARY_KEY BETWEEN ? AND
> ?" and passing primary key ranges adding up to the full range of the
> table's keys.
>
> Note that, whether this performs better, will also depend on the database
> - e.g. if the database is already bottlenecked, then reading from it in
> parallel will not make things faster.
>
> On Thu, Oct 24, 2019 at 7:26 AM Jean-Baptiste Onofré 
> wrote:
>
>> Hi
>>
>> JdbcIO is basically a DoFn. So it could load all on a single executor
>> (there's no obvious way to split).
>>
>> It's what you mean ?
>>
>> Regards
>> JB
>>
>> Le 24 oct. 2019 15:26, Jozef Vilcek  a écrit :
>>
>> Hi,
>>
>> I am in a need to read a big-ish data set via JdbcIO. This forced me to
>> bump up memory for my executor (right now using SparkRunner). It seems that
>> JdbcIO has a requirement to fit all data in memory as it is using DoFn to
>> unfold query to list of elements.
>>
>> BoundedSource would not face the need to fit result in memory, but JdbcIO
>> is using DoFn. Also, in recent discussion [1] it was suggested that
>> BoudnedSource should not be used as it is obsolete.
>>
>> Does anyone faced this issue? What would be the best way to solve it? If
>> DoFn should be kept, then I can only think of splitting the query to ranges
>> and try to find most fitting number of rows to read at once.
>>
>> I appreciate any thoughts.
>>
>> [1]
>> https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource
>>
>>
>>


Re: JdbcIO read needs to fit in memory

2019-10-24 Thread Jean-Baptiste Onofré
HiJdbcIO is basically a DoFn. So it could load all on a single executor (there's no obvious way to split).It's what you mean ?RegardsJBLe 24 oct. 2019 15:26, Jozef Vilcek  a écrit :Hi,I am in a need to read a big-ish data set via JdbcIO. This forced me to bump up memory for my executor (right now using SparkRunner). It seems that JdbcIO has a requirement to fit all data in memory as it is using DoFn to unfold query to list of elements.BoundedSource would not face the need to fit result in memory, but JdbcIO is using DoFn. Also, in recent discussion [1] it was suggested that BoudnedSource should not be used as it is obsolete.Does anyone faced this issue? What would be the best way to solve it? If DoFn should be kept, then I can only think of splitting the query to ranges and try to find most fitting number of rows to read at once.I appreciate any thoughts. [1] https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource


JdbcIO read needs to fit in memory

2019-10-24 Thread Jozef Vilcek
Hi,

I am in a need to read a big-ish data set via JdbcIO. This forced me to
bump up memory for my executor (right now using SparkRunner). It seems that
JdbcIO has a requirement to fit all data in memory as it is using DoFn to
unfold query to list of elements.

BoundedSource would not face the need to fit result in memory, but JdbcIO
is using DoFn. Also, in recent discussion [1] it was suggested that
BoudnedSource should not be used as it is obsolete.

Does anyone faced this issue? What would be the best way to solve it? If
DoFn should be kept, then I can only think of splitting the query to ranges
and try to find most fitting number of rows to read at once.

I appreciate any thoughts.

[1]
https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource


Re: Intermittent No FileSystem found exception

2019-10-24 Thread Koprivica,Preston Blake
Hi Maulik,

I believe you may be witnessing this issue: 
https://issues.apache.org/jira/browse/BEAM-8303.  We ran into this using 
beam-2.15.0 on flink-1.8 over S3.  It looks like it’ll be fixed in 2.17.0.

As a temporary workaround, you can set the #withNoSpilling() option if you’re 
using the FileIO api.  If not, it should be relatively easy to move to it.

From: Maulik Soneji 
Reply-To: "dev@beam.apache.org" 
Date: Thursday, October 24, 2019 at 7:05 AM
To: "dev@beam.apache.org" 
Subject: Intermittent No FileSystem found exception

Hi everyone,

We are running a Batch job on flink that reads data from GCS and does some 
aggregation on this data.
We are intermittently getting issue: `No filesystem found for scheme gs`

We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4

On remote debugging the task managers, we found that in a few task managers, 
the GcsFileSystemRegistrar is not added to the list of FileSystem Schemes. In 
these task managers, we get this issue.

The collection SCHEME_TO_FILESYSTEM is getting modified only in 
setDefaultPipelineOptions function call in org.apache.beam.sdk.io.FileSystems 
class and this function is not getting called and thus the 
GcsFileSystemRegistrar is not added to SCHEME_TO_FILESYSTEM.

Detailed stacktrace:


java.lang.IllegalArgumentException: No filesystem found for scheme gs

 at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)

 at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)

 at 
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)

 at 
org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)

 at 
org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)

 at 
org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)

 at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)

 at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)

 at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)

 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)

 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)

 at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)

 at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)

 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)

 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)

 at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

 at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

 at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

 at 
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)

 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

 at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

 at java.lang.Thread.run(Thread.java:748)
Inorder to resolve this issue, we tried calling the following in PTransform's 
expand function:

FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
This function call is to make sure that the GcsFileSystemRegistrar is added to 
the list, but this hasn't solved the issue.

Can someone please help in checking why this might be happening and what can be 
done to resolve this issue.

Thanks and Regards,
Maulik


CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.


Intermittent No FileSystem found exception

2019-10-24 Thread Maulik Soneji
Hi everyone,

We are running a Batch job on flink that reads data from GCS and does some
aggregation on this data.
We are intermittently getting issue: `No filesystem found for scheme gs`

We are running Beam version 2.15.0 with FlinkRunner, Flink version: 1.6.4

On remote debugging the task managers, we found that in a few task
managers, the *GcsFileSystemRegistrar is not added to the list of
FileSystem Schemes*. In these task managers, we get this issue.

The collection *SCHEME_TO_FILESYSTEM* is getting modified only in
*setDefaultPipelineOptions* function call in
org.apache.beam.sdk.io.FileSystems class and this function is not getting
called and thus the GcsFileSystemRegistrar is not added to
*SCHEME_TO_FILESYSTEM*.

*Detailed stacktrace:*


java.lang.IllegalArgumentException: No filesystem found for scheme gs
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at 
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at 
org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
at 
org.apache.beam.sdk.io.fs.MetadataCoder.decodeBuilder(MetadataCoder.java:62)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:58)
at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:36)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:592)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:583)
at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:529)
at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

Inorder to resolve this issue, we tried calling the following in
PTransform's expand function:

FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());

This function call is to make sure that the GcsFileSystemRegistrar is added
to the list, but this hasn't solved the issue.

Can someone please help in checking why this might be happening and what
can be done to resolve this issue.

Thanks and Regards,
Maulik


Re: Apache Pulsar connector for Beam

2019-10-24 Thread Taher Koitawala
Hi Reza,
 Thanks for your reply. However i do not see Pulsar listed in
there. Should we file a jira?

On Thu, Oct 24, 2019, 12:16 PM Reza Rokni  wrote:

> Hi Taher,
>
> You can see the list of current and wip IO's here:
>
> https://beam.apache.org/documentation/io/built-in/
>
> Cheers
>
> Reza
>
> On Thu, 24 Oct 2019 at 13:56, Taher Koitawala  wrote:
>
>> Hi All,
>>  Been wanting to know if we have a Pulsar connector for Beam.
>> Pulsar is another messaging queue like Kafka and I would like to build a
>> streaming pipeline with Pulsar. Any help would be appreciated..
>>
>>
>> Regards,
>> Taher Koitawala
>>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>


Re: Apache Pulsar connector for Beam

2019-10-24 Thread Reza Rokni
Hi Taher,

You can see the list of current and wip IO's here:

https://beam.apache.org/documentation/io/built-in/

Cheers

Reza

On Thu, 24 Oct 2019 at 13:56, Taher Koitawala  wrote:

> Hi All,
>  Been wanting to know if we have a Pulsar connector for Beam.
> Pulsar is another messaging queue like Kafka and I would like to build a
> streaming pipeline with Pulsar. Any help would be appreciated..
>
>
> Regards,
> Taher Koitawala
>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.