Re: [DISCUSS] Dependency management for Python SDK Container

2019-08-05 Thread Valentyn Tymofieiev
On Tue, Aug 6, 2019 at 2:29 AM Ahmet Altay  wrote:

>
>
> On Mon, Aug 5, 2019 at 1:43 AM Valentyn Tymofieiev 
> wrote:
>
>> - The purpose of install_requires in setup.py is to define the maximally
>> permissive set of requirements for a package[1]. We don't pin a version in
>> setup.py without a strong reason, instead we typically pick up a lower
>> bound we have tested, and set an upper bound to be next major version.
>> - The purpose of requirements.txt is to force pip to properly resolve
>> dependencies, and create a reproducible execution environment, since pip
>> doesn’t have true dependency resolution [2]
>>
>> We currently regularly upgrade setup.py dependencies, but do not update
>> base_image_requirements.txt. Also, our requirements file is not exhaustive.
>> We need to introduce a process to fix this.
>>
>
> base_image_requirements.txt currently does not drive any released Beam
> artifact. It would make sense to address this as part of the SDKHarness
> container image release process. Similar mechanism might make sense for
> figuring out what goes into containers for other SDKs. (Perhaps add your
> proposal to
> https://cwiki.apache.org/confluence/display/BEAM/%5BWIP%5D+SDKHarness+Container+Image+Release+Process
> )
>

+1. I will add notes to the wiki, thanks.


>
>

>
>> I would recommend that in new process:
>> - All dependencies of Beam Python SDK, including transitive
>> dependencies, are listed in base_image_requirements.txt (or another
>> requirements file). "Explicit is better than implicit."
>> - Requirements file is regenerated whenever setup.py changes..
>>
> - When we build a container image, we check that the final image has
>> exactly the same versions of dependencies that were spelled out in
>> requirements file (no versions added, or changed)
>>
>
> The above could be done in a two step process:
> - a human generated requirements file like base_image_requirements today
> which has a set of curated requirements.
> - Changes to the first file would result in a generated file with all
> transitive dependencies. Second file could be used as the source of truth
> for all dependencies at a particular commit. Generated file could be used
> for the container builds.
>
>
>> - We also check that there are no dependency conflicts (they typically
>> look like: Package X requires version A of dependency Y, but you will have
>> B, which is incompatible).
>>
>
> +1. Container build could verify this.
>
>
>> - We update the versions of pinned dependencies periodically. We may want
>> to put all dependencies of SDK harness containers on the radar of Beam
>> dependency checker.
>>
>
> +1 but a separate process.
>
> This process does not address how to keep setup.py in sync with
> requirements file. We could use git hooks to ensure that files are changed
> at the same time.
>

In addition to git hooks, we could try to check that once we install Apache
Beam (after installing requirements file dependencies), we don't pull
anything from PyPi. We could reply on pip output or cut internet access to
enforce this.


>
>> Valentyn
>>
>> [1]
>> https://packaging.python.org/discussions/install-requires-vs-requirements/
>> [2] https://pip.pypa.io/en/stable/user_guide/#requirements-files
>>
>> On Sat, Aug 3, 2019 at 2:47 AM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Fri, Aug 2, 2019 at 4:34 PM Brian Hulette 
>>> wrote:
>>>
 Thanks for the reply, I added some responses inline.

 On Fri, Aug 2, 2019 at 2:42 PM Ahmet Altay  wrote:
 >
 > There is a value in explicitly pinning the dependencies to be used in
 the containers:
 > - It reproducibly produces the same container. This will be important
 once we start release Beam container. By looking at a Beam release branch,
 one could exactly figure out the set of dependencies available in a
 released container.
 >
 > - Package manager (pip) in this is notorious about resolving versions
 for sub-dependencies, and sometimes picks incompatible dependencies.
 > - SImilarly this repdocubility is helpful with tests, that can work
 on the same container version unaffected by numerous dependency version
 changes happening in sub-dependecies.

 I thought this may be the reason, it definitely makes sense to pin
 dependencies for reproducible builds. If that is the case though I
 think we should at least change the comment in
 base_image_requirements.txt to say so (I'm happy to write a patch for
 that).

>>>
>>> Sounds good.
>>>
>>>

 > In addition, I will argue that we will want to keep a form of
 base_image_requirements.txt, in order to be able to install additional
 dependencies on the container and we would not be able to get rid of this
 mechanism.

 My PR doesn't remove base_image_requirements.txt. It keeps the file
 there but removes all the pinned beam dependencies, and instead just
 lists the beam tar.gz as a dependency directly, along with all of 

Re: [DISCUSS] Dependency management for Python SDK Container

2019-08-05 Thread Ahmet Altay
On Mon, Aug 5, 2019 at 1:43 AM Valentyn Tymofieiev 
wrote:

> - The purpose of install_requires in setup.py is to define the maximally
> permissive set of requirements for a package[1]. We don't pin a version in
> setup.py without a strong reason, instead we typically pick up a lower
> bound we have tested, and set an upper bound to be next major version.
> - The purpose of requirements.txt is to force pip to properly resolve
> dependencies, and create a reproducible execution environment, since pip
> doesn’t have true dependency resolution [2]
>
> We currently regularly upgrade setup.py dependencies, but do not update
> base_image_requirements.txt. Also, our requirements file is not exhaustive.
> We need to introduce a process to fix this.
>

base_image_requirements.txt currently does not drive any released Beam
artifact. It would make sense to address this as part of the SDKHarness
container image release process. Similar mechanism might make sense for
figuring out what goes into containers for other SDKs. (Perhaps add your
proposal to
https://cwiki.apache.org/confluence/display/BEAM/%5BWIP%5D+SDKHarness+Container+Image+Release+Process
)


> I would recommend that in new process:
> - All dependencies of Beam Python SDK, including transitive
> dependencies, are listed in base_image_requirements.txt (or another
> requirements file). "Explicit is better than implicit."
> - Requirements file is regenerated whenever setup.py changes..
>
- When we build a container image, we check that the final image has
> exactly the same versions of dependencies that were spelled out in
> requirements file (no versions added, or changed)
>

The above could be done in a two step process:
- a human generated requirements file like base_image_requirements today
which has a set of curated requirements.
- Changes to the first file would result in a generated file with all
transitive dependencies. Second file could be used as the source of truth
for all dependencies at a particular commit. Generated file could be used
for the container builds.


> - We also check that there are no dependency conflicts (they typically
> look like: Package X requires version A of dependency Y, but you will have
> B, which is incompatible).
>

+1. Container build could verify this.


> - We update the versions of pinned dependencies periodically. We may want
> to put all dependencies of SDK harness containers on the radar of Beam
> dependency checker.
>

+1 but a separate process.

This process does not address how to keep setup.py in sync with
requirements file. We could use git hooks to ensure that files are changed
at the same time.


> Valentyn
>
> [1]
> https://packaging.python.org/discussions/install-requires-vs-requirements/
> [2] https://pip.pypa.io/en/stable/user_guide/#requirements-files
>
> On Sat, Aug 3, 2019 at 2:47 AM Ahmet Altay  wrote:
>
>>
>>
>> On Fri, Aug 2, 2019 at 4:34 PM Brian Hulette  wrote:
>>
>>> Thanks for the reply, I added some responses inline.
>>>
>>> On Fri, Aug 2, 2019 at 2:42 PM Ahmet Altay  wrote:
>>> >
>>> > There is a value in explicitly pinning the dependencies to be used in
>>> the containers:
>>> > - It reproducibly produces the same container. This will be important
>>> once we start release Beam container. By looking at a Beam release branch,
>>> one could exactly figure out the set of dependencies available in a
>>> released container.
>>> >
>>> > - Package manager (pip) in this is notorious about resolving versions
>>> for sub-dependencies, and sometimes picks incompatible dependencies.
>>> > - SImilarly this repdocubility is helpful with tests, that can work on
>>> the same container version unaffected by numerous dependency version
>>> changes happening in sub-dependecies.
>>>
>>> I thought this may be the reason, it definitely makes sense to pin
>>> dependencies for reproducible builds. If that is the case though I
>>> think we should at least change the comment in
>>> base_image_requirements.txt to say so (I'm happy to write a patch for
>>> that).
>>>
>>
>> Sounds good.
>>
>>
>>>
>>> > In addition, I will argue that we will want to keep a form of
>>> base_image_requirements.txt, in order to be able to install additional
>>> dependencies on the container and we would not be able to get rid of this
>>> mechanism.
>>>
>>> My PR doesn't remove base_image_requirements.txt. It keeps the file
>>> there but removes all the pinned beam dependencies, and instead just
>>> lists the beam tar.gz as a dependency directly, along with all of the
>>> additional dependencies.
>>>
>>
>> Got it, thanks for the clarification.
>>
>>
>>>
>>> >
>>> >
>>> > I will suggest, If possible ensure that both files are modified
>>> synchronously. This might be possible with precommit hooks although I am
>>> not familiar.
>>>
>>> Yeah setting up something like that would help, but surely there's a
>>> better way to lock dependencies than to count on people manually
>>> updating this file (even with a precommit reminding them)? I'm not

Re: Write-through-cache in State logic

2019-08-05 Thread Thomas Weise
That would add a synchronization point that forces extra latency especially
in streaming mode.

Wouldn't it be possible for the runner to assign the token when starting
the bundle and for the SDK to pass it along the state requests? That way,
there would be no need to batch and wait for a flush.


On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik  wrote:

> I believe the intent is to add a new state API call telling the runner
> that it is blocked waiting for a response (BEAM-7000).
>
> This should allow the runner to wait till it sees one of these I'm blocked
> requests and then merge + batch any state calls it may have at that point
> in time allowing it to convert clear + appends into set calls and do any
> other optimizations as well. By default, the runner would have a time and
> space based limit on how many outstanding state calls there are before
> choosing to resolve them.
>
> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik  wrote:
>
>> Now I see what you mean.
>>
>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:
>>
>>> Hi Luke,
>>>
>>> I guess the answer is that it depends on the state backend. If a set
>>> operation in the state backend is available that is more efficient than
>>> clear+append, then it would be beneficial to have a dedicated fn api
>>> operation to allow for such optimization. That's something that needs to be
>>> determined with a profiler :)
>>>
>>> But the low hanging fruit is cross-bundle caching.
>>>
>>> Thomas
>>>
>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik  wrote:
>>>
 Thomas, why do you think a single round trip is needed?

 clear + append can be done blindly from the SDK side and it has total
 knowledge of the state at that point in time till the end of the bundle at
 which point you want to wait to get the cache token back from the runner
 for the append call so that for the next bundle you can reuse the state if
 the key wasn't processed elsewhere.

 Also, all state calls are "streamed" over gRPC so you don't need to
 wait for clear to complete before being able to send append.

 On Tue, Jul 30, 2019 at 12:58 AM jincheng sun 
 wrote:

> Hi Rakesh,
>
> Glad to see you pointer this problem out!
> +1 for add this implementation. Manage State by write-through-cache is
> pretty important for Streaming job!
>
> Best, Jincheng
>
> Thomas Weise  于2019年7月29日周一 下午8:54写道:
>
>> FYI a basic test appears to confirm the importance of the
>> cross-bundle caching: I found that the throughput can be increased by
>> playing with the bundle size in the Flink runner. Default caps at 1000
>> elements (or 1 second). So on a high throughput stream the bundles would 
>> be
>> capped by the count limit. Bumping the count limit increases the 
>> throughput
>> by reducing the chatter over the state plane (more cache hits due to 
>> larger
>> bundle).
>>
>> The next level of investigation would involve profiling. But just by
>> looking at metrics, the CPU utilization on the Python worker side dropped
>> significantly while on the Flink side it remains nearly same. There are 
>> no
>> metrics for state operations on either side, I think it would be very
>> helpful to get these in place also.
>>
>> Below the stateful processing code for reference.
>>
>> Thomas
>>
>>
>> class StatefulFn(beam.DoFn):
>> count_state_spec = userstate.CombiningValueStateSpec(
>> 'count',
>> beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
>> timer_spec = userstate.TimerSpec('timer',
>> userstate.TimeDomain.WATERMARK)
>>
>> def process(self, kv,
>> count=beam.DoFn.StateParam(count_state_spec),
>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
>> count.add(1)
>> timer_seconds = (window.end.micros // 100) - 1
>> timer.set(timer_seconds)
>>
>> @userstate.on_timer(timer_spec)
>> def process_timer(self,
>> count=beam.DoFn.StateParam(count_state_spec), 
>> window=beam.DoFn.WindowParam):
>> if count.read() == 0:
>> logging.warning("###timer fired with count %d, window %s"
>> % (count.read(), window))
>>
>>
>>
>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
>>> wrote:
>>> >
>>> > Thanks Robert,
>>> >
>>> >  I stumble on the jira that you have created some time ago
>>> > https://jira.apache.org/jira/browse/BEAM-5428
>>> >
>>> > You also marked code where code changes are required:
>>> >
>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>>> >
>>> 

Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
I believe the intent is to add a new state API call telling the runner that
it is blocked waiting for a response (BEAM-7000).

This should allow the runner to wait till it sees one of these I'm blocked
requests and then merge + batch any state calls it may have at that point
in time allowing it to convert clear + appends into set calls and do any
other optimizations as well. By default, the runner would have a time and
space based limit on how many outstanding state calls there are before
choosing to resolve them.

On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik  wrote:

> Now I see what you mean.
>
> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:
>
>> Hi Luke,
>>
>> I guess the answer is that it depends on the state backend. If a set
>> operation in the state backend is available that is more efficient than
>> clear+append, then it would be beneficial to have a dedicated fn api
>> operation to allow for such optimization. That's something that needs to be
>> determined with a profiler :)
>>
>> But the low hanging fruit is cross-bundle caching.
>>
>> Thomas
>>
>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik  wrote:
>>
>>> Thomas, why do you think a single round trip is needed?
>>>
>>> clear + append can be done blindly from the SDK side and it has total
>>> knowledge of the state at that point in time till the end of the bundle at
>>> which point you want to wait to get the cache token back from the runner
>>> for the append call so that for the next bundle you can reuse the state if
>>> the key wasn't processed elsewhere.
>>>
>>> Also, all state calls are "streamed" over gRPC so you don't need to wait
>>> for clear to complete before being able to send append.
>>>
>>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun 
>>> wrote:
>>>
 Hi Rakesh,

 Glad to see you pointer this problem out!
 +1 for add this implementation. Manage State by write-through-cache is
 pretty important for Streaming job!

 Best, Jincheng

 Thomas Weise  于2019年7月29日周一 下午8:54写道:

> FYI a basic test appears to confirm the importance of the cross-bundle
> caching: I found that the throughput can be increased by playing with the
> bundle size in the Flink runner. Default caps at 1000 elements (or 1
> second). So on a high throughput stream the bundles would be capped by the
> count limit. Bumping the count limit increases the throughput by reducing
> the chatter over the state plane (more cache hits due to larger bundle).
>
> The next level of investigation would involve profiling. But just by
> looking at metrics, the CPU utilization on the Python worker side dropped
> significantly while on the Flink side it remains nearly same. There are no
> metrics for state operations on either side, I think it would be very
> helpful to get these in place also.
>
> Below the stateful processing code for reference.
>
> Thomas
>
>
> class StatefulFn(beam.DoFn):
> count_state_spec = userstate.CombiningValueStateSpec(
> 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()),
> sum)
> timer_spec = userstate.TimerSpec('timer',
> userstate.TimeDomain.WATERMARK)
>
> def process(self, kv,
> count=beam.DoFn.StateParam(count_state_spec),
> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
> count.add(1)
> timer_seconds = (window.end.micros // 100) - 1
> timer.set(timer_seconds)
>
> @userstate.on_timer(timer_spec)
> def process_timer(self,
> count=beam.DoFn.StateParam(count_state_spec), 
> window=beam.DoFn.WindowParam):
> if count.read() == 0:
> logging.warning("###timer fired with count %d, window %s"
> % (count.read(), window))
>
>
>
> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
> wrote:
>
>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
>> wrote:
>> >
>> > Thanks Robert,
>> >
>> >  I stumble on the jira that you have created some time ago
>> > https://jira.apache.org/jira/browse/BEAM-5428
>> >
>> > You also marked code where code changes are required:
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>> >
>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>> >
>> > I am willing to provide help to implement this. Let me know how I
>> can help.
>>
>> As far as I'm aware, no one is actively working on it right now.
>> Please feel free to assign yourself the JIRA entry and I'll be happy
>> to answer any questions you 

Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
Now I see what you mean.

On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise  wrote:

> Hi Luke,
>
> I guess the answer is that it depends on the state backend. If a set
> operation in the state backend is available that is more efficient than
> clear+append, then it would be beneficial to have a dedicated fn api
> operation to allow for such optimization. That's something that needs to be
> determined with a profiler :)
>
> But the low hanging fruit is cross-bundle caching.
>
> Thomas
>
> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik  wrote:
>
>> Thomas, why do you think a single round trip is needed?
>>
>> clear + append can be done blindly from the SDK side and it has total
>> knowledge of the state at that point in time till the end of the bundle at
>> which point you want to wait to get the cache token back from the runner
>> for the append call so that for the next bundle you can reuse the state if
>> the key wasn't processed elsewhere.
>>
>> Also, all state calls are "streamed" over gRPC so you don't need to wait
>> for clear to complete before being able to send append.
>>
>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun 
>> wrote:
>>
>>> Hi Rakesh,
>>>
>>> Glad to see you pointer this problem out!
>>> +1 for add this implementation. Manage State by write-through-cache is
>>> pretty important for Streaming job!
>>>
>>> Best, Jincheng
>>>
>>> Thomas Weise  于2019年7月29日周一 下午8:54写道:
>>>
 FYI a basic test appears to confirm the importance of the cross-bundle
 caching: I found that the throughput can be increased by playing with the
 bundle size in the Flink runner. Default caps at 1000 elements (or 1
 second). So on a high throughput stream the bundles would be capped by the
 count limit. Bumping the count limit increases the throughput by reducing
 the chatter over the state plane (more cache hits due to larger bundle).

 The next level of investigation would involve profiling. But just by
 looking at metrics, the CPU utilization on the Python worker side dropped
 significantly while on the Flink side it remains nearly same. There are no
 metrics for state operations on either side, I think it would be very
 helpful to get these in place also.

 Below the stateful processing code for reference.

 Thomas


 class StatefulFn(beam.DoFn):
 count_state_spec = userstate.CombiningValueStateSpec(
 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()),
 sum)
 timer_spec = userstate.TimerSpec('timer',
 userstate.TimeDomain.WATERMARK)

 def process(self, kv, count=beam.DoFn.StateParam(count_state_spec),
 timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
 count.add(1)
 timer_seconds = (window.end.micros // 100) - 1
 timer.set(timer_seconds)

 @userstate.on_timer(timer_spec)
 def process_timer(self,
 count=beam.DoFn.StateParam(count_state_spec), 
 window=beam.DoFn.WindowParam):
 if count.read() == 0:
 logging.warning("###timer fired with count %d, window %s" %
 (count.read(), window))



 On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
 wrote:

> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
> wrote:
> >
> > Thanks Robert,
> >
> >  I stumble on the jira that you have created some time ago
> > https://jira.apache.org/jira/browse/BEAM-5428
> >
> > You also marked code where code changes are required:
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
> >
> > I am willing to provide help to implement this. Let me know how I
> can help.
>
> As far as I'm aware, no one is actively working on it right now.
> Please feel free to assign yourself the JIRA entry and I'll be happy
> to answer any questions you might have if (well probably when) these
> pointers are insufficient.
>
> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
> wrote:
> >>
> >> This is documented at
> >>
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >> . Note that it requires participation of both the runner and the SDK
> >> (though there are no correctness issues if one or the other side
> does
> >> not understand the protocol, caching just won't be used).
> >>
> >> I don't think it's been implemented anywhere, but could be very
> >> beneficial for performance.
> >>
> >> On Wed, Jul 

Re: Write-through-cache in State logic

2019-08-05 Thread Thomas Weise
Hi Luke,

I guess the answer is that it depends on the state backend. If a set
operation in the state backend is available that is more efficient than
clear+append, then it would be beneficial to have a dedicated fn api
operation to allow for such optimization. That's something that needs to be
determined with a profiler :)

But the low hanging fruit is cross-bundle caching.

Thomas

On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik  wrote:

> Thomas, why do you think a single round trip is needed?
>
> clear + append can be done blindly from the SDK side and it has total
> knowledge of the state at that point in time till the end of the bundle at
> which point you want to wait to get the cache token back from the runner
> for the append call so that for the next bundle you can reuse the state if
> the key wasn't processed elsewhere.
>
> Also, all state calls are "streamed" over gRPC so you don't need to wait
> for clear to complete before being able to send append.
>
> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun 
> wrote:
>
>> Hi Rakesh,
>>
>> Glad to see you pointer this problem out!
>> +1 for add this implementation. Manage State by write-through-cache is
>> pretty important for Streaming job!
>>
>> Best, Jincheng
>>
>> Thomas Weise  于2019年7月29日周一 下午8:54写道:
>>
>>> FYI a basic test appears to confirm the importance of the cross-bundle
>>> caching: I found that the throughput can be increased by playing with the
>>> bundle size in the Flink runner. Default caps at 1000 elements (or 1
>>> second). So on a high throughput stream the bundles would be capped by the
>>> count limit. Bumping the count limit increases the throughput by reducing
>>> the chatter over the state plane (more cache hits due to larger bundle).
>>>
>>> The next level of investigation would involve profiling. But just by
>>> looking at metrics, the CPU utilization on the Python worker side dropped
>>> significantly while on the Flink side it remains nearly same. There are no
>>> metrics for state operations on either side, I think it would be very
>>> helpful to get these in place also.
>>>
>>> Below the stateful processing code for reference.
>>>
>>> Thomas
>>>
>>>
>>> class StatefulFn(beam.DoFn):
>>> count_state_spec = userstate.CombiningValueStateSpec(
>>> 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()),
>>> sum)
>>> timer_spec = userstate.TimerSpec('timer',
>>> userstate.TimeDomain.WATERMARK)
>>>
>>> def process(self, kv, count=beam.DoFn.StateParam(count_state_spec),
>>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
>>> count.add(1)
>>> timer_seconds = (window.end.micros // 100) - 1
>>> timer.set(timer_seconds)
>>>
>>> @userstate.on_timer(timer_spec)
>>> def process_timer(self,
>>> count=beam.DoFn.StateParam(count_state_spec), window=beam.DoFn.WindowParam):
>>> if count.read() == 0:
>>> logging.warning("###timer fired with count %d, window %s" %
>>> (count.read(), window))
>>>
>>>
>>>
>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
>>> wrote:
>>>
 On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
 wrote:
 >
 > Thanks Robert,
 >
 >  I stumble on the jira that you have created some time ago
 > https://jira.apache.org/jira/browse/BEAM-5428
 >
 > You also marked code where code changes are required:
 >
 https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
 >
 https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
 >
 https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
 >
 > I am willing to provide help to implement this. Let me know how I can
 help.

 As far as I'm aware, no one is actively working on it right now.
 Please feel free to assign yourself the JIRA entry and I'll be happy
 to answer any questions you might have if (well probably when) these
 pointers are insufficient.

 > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
 wrote:
 >>
 >> This is documented at
 >>
 https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
 >> . Note that it requires participation of both the runner and the SDK
 >> (though there are no correctness issues if one or the other side does
 >> not understand the protocol, caching just won't be used).
 >>
 >> I don't think it's been implemented anywhere, but could be very
 >> beneficial for performance.
 >>
 >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar 
 wrote:
 >> >
 >> > I checked the python sdk[1] and it has similar implementation as
 Java SDK.
 >> >
 >> > I would agree with Thomas. In case of high volume 

Re: Latency of Google Dataflow with Pubsub

2019-08-05 Thread Lukasz Cwik
+dev 

On Mon, Aug 5, 2019 at 12:49 PM Dmitry Minaev  wrote:

> Hi there,
>
> I'm building streaming pipelines in Beam (using Google Dataflow runner)
> and using Google Pubsub as a message broker. I've made a couple of
> experiments with a very simple pipeline: consume events from Pubsub
> subscription, add a timestamp to the message body, emit the new event to
> another Pubsub topic. I'm using all the default parameters when producing
> and consuming messages.
>
> I've noticed a pretty high latency while consuming messages in Dataflow
> from Pubsub. My observations show that average duration between the event
> create timestamp (simple producer that publishes events to Pubsub) and
> event consume timestamp (Google Dataflow using PubsubIO) is more than 2
> seconds. I've been publishing messages at different rates, e.g. 10 msg/sec,
> 1000 msg/sec, 10,000 msg/sec. And the latency never went lower than 2
> seconds. Such latency looks really high. I've tried with direct runner and
> it has high latency too.
>
> I've made a few other experiments with Kafka (very small Kafka cluster)
> and the same kind of pipeline: consume from Kafka, add timestamp, publish
> to another Kafka topic. I saw the latency is much lower, on average it's
> about 150 milliseconds.
>
> I suspect there is some batching in PubsubIO that makes the latency so
> high.
>
> My questions are: what should be expected latency in this kind of
> scenarios? Is there any recommendations to achieve lower latency?
>
> I appreciate any help on this!
>
> Thank you,
> Dmitry.
>


Re: Write-through-cache in State logic

2019-08-05 Thread Lukasz Cwik
Thomas, why do you think a single round trip is needed?

clear + append can be done blindly from the SDK side and it has total
knowledge of the state at that point in time till the end of the bundle at
which point you want to wait to get the cache token back from the runner
for the append call so that for the next bundle you can reuse the state if
the key wasn't processed elsewhere.

Also, all state calls are "streamed" over gRPC so you don't need to wait
for clear to complete before being able to send append.

On Tue, Jul 30, 2019 at 12:58 AM jincheng sun 
wrote:

> Hi Rakesh,
>
> Glad to see you pointer this problem out!
> +1 for add this implementation. Manage State by write-through-cache is
> pretty important for Streaming job!
>
> Best, Jincheng
>
> Thomas Weise  于2019年7月29日周一 下午8:54写道:
>
>> FYI a basic test appears to confirm the importance of the cross-bundle
>> caching: I found that the throughput can be increased by playing with the
>> bundle size in the Flink runner. Default caps at 1000 elements (or 1
>> second). So on a high throughput stream the bundles would be capped by the
>> count limit. Bumping the count limit increases the throughput by reducing
>> the chatter over the state plane (more cache hits due to larger bundle).
>>
>> The next level of investigation would involve profiling. But just by
>> looking at metrics, the CPU utilization on the Python worker side dropped
>> significantly while on the Flink side it remains nearly same. There are no
>> metrics for state operations on either side, I think it would be very
>> helpful to get these in place also.
>>
>> Below the stateful processing code for reference.
>>
>> Thomas
>>
>>
>> class StatefulFn(beam.DoFn):
>> count_state_spec = userstate.CombiningValueStateSpec(
>> 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()),
>> sum)
>> timer_spec = userstate.TimerSpec('timer',
>> userstate.TimeDomain.WATERMARK)
>>
>> def process(self, kv, count=beam.DoFn.StateParam(count_state_spec),
>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
>> count.add(1)
>> timer_seconds = (window.end.micros // 100) - 1
>> timer.set(timer_seconds)
>>
>> @userstate.on_timer(timer_spec)
>> def process_timer(self, count=beam.DoFn.StateParam(count_state_spec),
>> window=beam.DoFn.WindowParam):
>> if count.read() == 0:
>> logging.warning("###timer fired with count %d, window %s" %
>> (count.read(), window))
>>
>>
>>
>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar 
>>> wrote:
>>> >
>>> > Thanks Robert,
>>> >
>>> >  I stumble on the jira that you have created some time ago
>>> > https://jira.apache.org/jira/browse/BEAM-5428
>>> >
>>> > You also marked code where code changes are required:
>>> >
>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>>> >
>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>>> >
>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>>> >
>>> > I am willing to provide help to implement this. Let me know how I can
>>> help.
>>>
>>> As far as I'm aware, no one is actively working on it right now.
>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>>> to answer any questions you might have if (well probably when) these
>>> pointers are insufficient.
>>>
>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> This is documented at
>>> >>
>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>> >> . Note that it requires participation of both the runner and the SDK
>>> >> (though there are no correctness issues if one or the other side does
>>> >> not understand the protocol, caching just won't be used).
>>> >>
>>> >> I don't think it's been implemented anywhere, but could be very
>>> >> beneficial for performance.
>>> >>
>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar 
>>> wrote:
>>> >> >
>>> >> > I checked the python sdk[1] and it has similar implementation as
>>> Java SDK.
>>> >> >
>>> >> > I would agree with Thomas. In case of high volume event stream and
>>> bigger cluster size, network call can potentially cause a bottleneck.
>>> >> >
>>> >> > @Robert
>>> >> > I am interested to see the proposal. Can you provide me the link of
>>> the proposal?
>>> >> >
>>> >> > [1]:
>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>> >> >
>>> >> >
>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise 
>>> wrote:
>>> >> >>
>>> >> >> Thanks for the pointer. For streaming, it will be important to
>>> support caching across 

Re: [Update] Beam 2.15 Release Progress

2019-08-05 Thread Yifan Zou
Hi,

I've verified release branch, and all Pre/Post-commits passed. The next
step would be verifying the javadoc.
We still have a few blocking issues,
https://issues.apache.org/jira/browse/BEAM-7880?jql=project%20%3D%20BEAM%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20%22Triage%20Needed%22)%20AND%20fixVersion%20%3D%202.15.0
.
Please ping me once the ticket got fixed, or update them to the next
version to unblock the release. Thanks.

Yifan

On Wed, Jul 31, 2019 at 4:33 PM Yifan Zou  wrote:

> Snapshots are published
> http://repository.apache.org/content/groups/snapshots/org/apache/beam/.
>
> On Wed, Jul 31, 2019 at 1:28 PM Yifan Zou  wrote:
>
>> Hi,
>>
>> The release branch is cut
>> https://github.com/apache/beam/tree/release-2.15.0.
>> The next step would be building snapshots and verify release branch.
>>
>> Regards.
>> Yifan
>>
>


Re: beam_PreCommit_Java_Commit is broken

2019-08-05 Thread Rui Wang
It's being tracked by https://issues.apache.org/jira/browse/BEAM-7892.


-Rui

On Mon, Aug 5, 2019 at 9:18 AM Alexey Romanenko 
wrote:

> According to "git bisect”, it seems like this was introduced with commit
> “149153b525236327badb138b09235ff735045adf” in PR
> https://github.com/apache/beam/pull/9223 (“[BEAM-7060] Introduce
> Python3-only test modules (#9223)”)
>
> Could some Python people take a look on this?
> Thank you.
>
> On 5 Aug 2019, at 18:04, Alexey Romanenko 
> wrote:
>
> Hi all,
>
> Looks like https://builds.apache.org/job/beam_PreCommit_Java_Commit/ is
> broken. It fails with:
>
> FAILURE: Build failed with an exception.
> * Where:
> Build file
> '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/sdks/python/apache_beam/testing/load_tests/build.gradle'
> line: 44
> * What went wrong:
> A problem occurred evaluating project
> ':sdks:python:apache_beam:testing:load_tests'.
> > Could not get unknown property 'files' for task
> ':sdks:python:apache_beam:testing:load_tests:run' of type
> org.gradle.api.tasks.Exec.
>
>
>


Re: beam_PreCommit_Java_Commit is broken

2019-08-05 Thread Ahmet Altay
This is tracked here: https://issues.apache.org/jira/browse/BEAM-7892

/cc +Valentyn Tymofieiev  +Udi Meiri 


On Mon, Aug 5, 2019 at 9:18 AM Alexey Romanenko 
wrote:

> According to "git bisect”, it seems like this was introduced with commit
> “149153b525236327badb138b09235ff735045adf” in PR
> https://github.com/apache/beam/pull/9223 (“[BEAM-7060] Introduce
> Python3-only test modules (#9223)”)
>
> Could some Python people take a look on this?
> Thank you.
>
> On 5 Aug 2019, at 18:04, Alexey Romanenko 
> wrote:
>
> Hi all,
>
> Looks like https://builds.apache.org/job/beam_PreCommit_Java_Commit/ is
> broken. It fails with:
>
> FAILURE: Build failed with an exception.
> * Where:
> Build file
> '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/sdks/python/apache_beam/testing/load_tests/build.gradle'
> line: 44
> * What went wrong:
> A problem occurred evaluating project
> ':sdks:python:apache_beam:testing:load_tests'.
> > Could not get unknown property 'files' for task
> ':sdks:python:apache_beam:testing:load_tests:run' of type
> org.gradle.api.tasks.Exec.
>
>
>


Re: beam_PreCommit_Java_Commit is broken

2019-08-05 Thread Alexey Romanenko
According to "git bisect”, it seems like this was introduced with commit 
“149153b525236327badb138b09235ff735045adf” in PR 
https://github.com/apache/beam/pull/9223 
 (“[BEAM-7060] Introduce Python3-only 
test modules (#9223)”)

Could some Python people take a look on this?
Thank you.

> On 5 Aug 2019, at 18:04, Alexey Romanenko  wrote:
> 
> Hi all,
> 
> Looks like https://builds.apache.org/job/beam_PreCommit_Java_Commit/ 
>  is broken. It 
> fails with:
> 
> FAILURE: Build failed with an exception.
> * Where:
> Build file 
> '/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/sdks/python/apache_beam/testing/load_tests/build.gradle'
>  line: 44
> * What went wrong:
> A problem occurred evaluating project 
> ':sdks:python:apache_beam:testing:load_tests'.
> > Could not get unknown property 'files' for task 
> > ':sdks:python:apache_beam:testing:load_tests:run' of type 
> > org.gradle.api.tasks.Exec.



beam_PreCommit_Java_Commit is broken

2019-08-05 Thread Alexey Romanenko
Hi all,

Looks like https://builds.apache.org/job/beam_PreCommit_Java_Commit/ 
 is broken. It fails 
with:

FAILURE: Build failed with an exception.
* Where:
Build file 
'/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Commit/src/sdks/python/apache_beam/testing/load_tests/build.gradle'
 line: 44
* What went wrong:
A problem occurred evaluating project 
':sdks:python:apache_beam:testing:load_tests'.
> Could not get unknown property 'files' for task 
> ':sdks:python:apache_beam:testing:load_tests:run' of type 
> org.gradle.api.tasks.Exec.

Beam Dependency Check Report (2019-08-05)

2019-08-05 Thread Apache Jenkins Server

High Priority Dependency Updates Of Beam Python SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
google-cloud-pubsub
0.39.1
0.45.0
2019-01-21
2019-08-05BEAM-5539
mock
2.0.0
3.0.5
2019-05-20
2019-05-20BEAM-7369
oauth2client
3.0.0
4.1.3
2018-12-10
2018-12-10BEAM-6089
Sphinx
1.8.5
2.1.2
2019-05-20
2019-06-24BEAM-7370
High Priority Dependency Updates Of Beam Java SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin
0.17.0
0.21.0
2019-02-11
2019-03-04BEAM-6645
org.conscrypt:conscrypt-openjdk
1.1.3
2.1.0
2018-06-04
2019-04-03BEAM-5748
javax.servlet:javax.servlet-api
3.1.0
4.0.1
2013-04-25
2018-04-20BEAM-5750
org.eclipse.jetty:jetty-server
9.2.10.v20150310
10.0.0-alpha0
2015-03-10
2019-07-11BEAM-5752
org.eclipse.jetty:jetty-servlet
9.2.10.v20150310
10.0.0-alpha0
2015-03-10
2019-07-11BEAM-5753
junit:junit
4.13-beta-1
4.13-beta-3
2018-11-25
2019-05-05BEAM-6127
com.github.spotbugs:spotbugs
3.1.10
4.0.0-beta3
2018-12-18
2019-06-23BEAM-7792
com.github.spotbugs:spotbugs-annotations
3.1.11
4.0.0-beta3
2019-01-21
2019-06-23BEAM-6951

 A dependency update is high priority if it satisfies one of following criteria: 

 It has major versions update available, e.g. org.assertj:assertj-core 2.5.0 -> 3.10.0; 


 It is over 3 minor versions behind the latest version, e.g. org.tukaani:xz 1.5 -> 1.8; 


 The current version is behind the later version for over 180 days, e.g. com.google.auto.service:auto-service 2014-10-24 -> 2017-12-11. 

 In Beam, we make a best-effort attempt at keeping all dependencies up-to-date.
 In the future, issues will be filed and tracked for these automatically,
 but in the meantime you can search for existing issues or open a new one.

 For more information:  Beam Dependency Guide  

Re: Snapshots not been updated

2019-08-05 Thread Valentyn Tymofieiev
I think https://issues.apache.org/jira/browse/BEAM-7892 is the reason.

On Sun, Aug 4, 2019 at 11:02 PM Ismaël Mejía  wrote:

> Hello,
>
> It seems the 2.16.0-SNAPSHOTS are not been updated since last friday 02/08.
>
> https://repository.apache.org/content/repositories/snapshots/org/apache/beam/beam-sdks-java-core/2.16.0-SNAPSHOT/
>
> Can somebody PTAL.
> Thanks
>


Re: [DISCUSS] Dependency management for Python SDK Container

2019-08-05 Thread Valentyn Tymofieiev
- The purpose of install_requires in setup.py is to define the maximally
permissive set of requirements for a package[1]. We don't pin a version in
setup.py without a strong reason, instead we typically pick up a lower
bound we have tested, and set an upper bound to be next major version.
- The purpose of requirements.txt is to force pip to properly resolve
dependencies, and create a reproducible execution environment, since pip
doesn’t have true dependency resolution [2]

We currently regularly upgrade setup.py dependencies, but do not update
base_image_requirements.txt. Also, our requirements file is not exhaustive.
We need to introduce a process to fix this. I would recommend that in new
process:
- All dependencies of Beam Python SDK, including transitive
dependencies, are listed in base_image_requirements.txt (or another
requirements file). "Explicit is better than implicit."
- Requirements file is regenerated whenever setup.py changes..
- When we build a container image, we check that the final image has
exactly the same versions of dependencies that were spelled out in
requirements file (no versions added, or changed)
- We also check that there are no dependency conflicts (they typically look
like: Package X requires version A of dependency Y, but you will have B,
which is incompatible).
- We update the versions of pinned dependencies periodically. We may want
to put all dependencies of SDK harness containers on the radar of Beam
dependency checker.

Valentyn

[1]
https://packaging.python.org/discussions/install-requires-vs-requirements/
[2] https://pip.pypa.io/en/stable/user_guide/#requirements-files

On Sat, Aug 3, 2019 at 2:47 AM Ahmet Altay  wrote:

>
>
> On Fri, Aug 2, 2019 at 4:34 PM Brian Hulette  wrote:
>
>> Thanks for the reply, I added some responses inline.
>>
>> On Fri, Aug 2, 2019 at 2:42 PM Ahmet Altay  wrote:
>> >
>> > There is a value in explicitly pinning the dependencies to be used in
>> the containers:
>> > - It reproducibly produces the same container. This will be important
>> once we start release Beam container. By looking at a Beam release branch,
>> one could exactly figure out the set of dependencies available in a
>> released container.
>> >
>> > - Package manager (pip) in this is notorious about resolving versions
>> for sub-dependencies, and sometimes picks incompatible dependencies.
>> > - SImilarly this repdocubility is helpful with tests, that can work on
>> the same container version unaffected by numerous dependency version
>> changes happening in sub-dependecies.
>>
>> I thought this may be the reason, it definitely makes sense to pin
>> dependencies for reproducible builds. If that is the case though I
>> think we should at least change the comment in
>> base_image_requirements.txt to say so (I'm happy to write a patch for
>> that).
>>
>
> Sounds good.
>
>
>>
>> > In addition, I will argue that we will want to keep a form of
>> base_image_requirements.txt, in order to be able to install additional
>> dependencies on the container and we would not be able to get rid of this
>> mechanism.
>>
>> My PR doesn't remove base_image_requirements.txt. It keeps the file
>> there but removes all the pinned beam dependencies, and instead just
>> lists the beam tar.gz as a dependency directly, along with all of the
>> additional dependencies.
>>
>
> Got it, thanks for the clarification.
>
>
>>
>> >
>> >
>> > I will suggest, If possible ensure that both files are modified
>> synchronously. This might be possible with precommit hooks although I am
>> not familiar.
>>
>> Yeah setting up something like that would help, but surely there's a
>> better way to lock dependencies than to count on people manually
>> updating this file (even with a precommit reminding them)? I'm not
>> that familiar with the complexities of python package management, but
>> it seems like we should be able to set up something with pip freeze or
>> pipenv's Pipfile.lock. The thing that confounds me when I try to think
>> through that setup though is how to reference a development build of
>> beam. If I run pip freeze in the container the requirement for
>> apache-beam is "2.16.0.dev0" which of course doesn't resolve when you
>> turn around and try to install it. Maybe an automated solution would
>> just replace the apache-beam line with
>> /opt/apache/beam/tars/apache-beam.tar.gz[gcp]?
>>
>
> I agree, this probably could be automated in a better way.
>
> I believe just doing "pip
> install /opt/apache/beam/tars/apache-beam.tar.gz[gcp]" at the beginning
> will prevent a later error for "pip install apache-beam==2.16.0.dev0". So
> we could have the former line in the docker file before installing the
> requirements.
>
>
>>
>> >
>> >
>> > Ahmet
>> >
>> >
>> > On Fri, Aug 2, 2019 at 2:20 PM Brian Hulette 
>> wrote:
>> >>
>> >> I recently ran into a portable python precommit failure that led me to
>> discover that python dependencies for the container are defined in two
>> different places, in slightly different