contributor permission for Beam Jira tickets

2018-10-12 Thread Hai Lu
Hi,

This is Hai from LinkedIn. I'm closely working with Xinyu on portable API
for Samza runner. Can someone add me as a contributor for Beam's Jira issue
tracker? I would like to create/assign tickets for my work.

Thanks,
Hai


Re: contributor permission for Beam Jira tickets

2018-10-12 Thread Hai Lu
Sorry I forgot to mention my Jira ID, it's lhaiesp

Thanks,
Hai

On Fri, Oct 12, 2018 at 2:38 PM Kenneth Knowles  wrote:

> Hi Hai,
>
> Have you created an account? A search for your name did not turn up
> anything. If you tell me your Jira ID I can add you.
>
> Kenn
>
> On Fri, Oct 12, 2018 at 2:16 PM Hai Lu  wrote:
>
>> Hi,
>>
>> This is Hai from LinkedIn. I'm closely working with Xinyu on portable API
>> for Samza runner. Can someone add me as a contributor for Beam's Jira issue
>> tracker? I would like to create/assign tickets for my work.
>>
>> Thanks,
>> Hai
>>
>


Performance of BeamFnData between Python and Java

2018-11-07 Thread Hai Lu
Hi,

This is Hai from LinkedIn. I'm currently working on Portable API for Samza
Runner. I was able to make Python work with Samza container reading from
Kafka. However, I'm seeing severe performance issue with my set up,
achieving only ~200KB throughput between the Samza runner in the Java side
and the sdk_worker in the Python part.

While I'm digging into this, I wonder whether some one has benchmarked the
data channel between Java and Python and had some results how much
throughput can be reached? Assuming single worker thread and single
JobBundleFactory.

I might be missing some very basic and naive gRPC setting which leads to
this unsatisfactory results. So another question is whether are any good
articles or documentations about gRPC tuning dedicated to IPC?

Thanks,
Hai


Re: Beam Samza Runner status update

2018-10-10 Thread Hai Lu
Hi, all

This is Hai from LinkedIn. As Xinyu mentioned, I have been working on
portable API for Samza runner and made some solid progress. It's been a
very smooth process (although not effortless for sure) and I'm really
grateful for the great platform that you all have built. I'm very
impressed. Bravo!

Excited to work with everyone on Beam. Do expect more questions from me
down the road.

Thanks,
Hai

On Wed, Oct 10, 2018 at 12:36 PM Kenneth Knowles  wrote:

> Clarification: Thomas Groh wrote the fuser, not me!
>
> Thanks for the sharing all this. Really cool.
>
> Kenn
>
> On Wed, Oct 10, 2018 at 11:17 AM Rui Wang  wrote:
>
>> Thanks for sharing! it's so exciting to hear that Beam is being used on
>> Samza in production @LinkedIn! Your feedback will be helpful to Beam
>> community!
>>
>> Besides, Beam supports SQL right now and hopefully Beam community could
>> also receive feedback on BeamSQL
>> <https://beam.apache.org/documentation/dsls/sql/overview/> in the future.
>>
>> -Rui
>>
>> On Wed, Oct 10, 2018 at 11:10 AM Jean-Baptiste Onofré 
>> wrote:
>>
>>> Thanks for sharing and congrats for this great work !
>>>
>>> Regards
>>> JB
>>> Le 10 oct. 2018, à 20:23, Xinyu Liu @gmail.com
>>> target=_blank>xinyuliu...@gmail.com> a écrit:
>>>>
>>>> Hi, All,
>>>>
>>>> It's been over four months since we added the Samza Runner to Beam, and
>>>> we've been making a lot of progress after that. Here I would like to update
>>>> your guys and share some really good news happening here at LinkedIn:
>>>>
>>>> 1) First Beam job in production @LInkedIn!
>>>> After a few rounds of testing and benchmarking, we finally rolled out
>>>> our first Beam job here! The job uses quite a few features, such as event
>>>> time, fixed/session windowing, early triggering, and stateful processing.
>>>> Our first customer is very happy and they highly appraise the easy-to-use
>>>> Beam API as well as powerful processing model. Due to the limited resources
>>>> here, we put our full trust in the work you guys are doing, and we didn't
>>>> run into any surprises. We see extremely attention to details as well as
>>>> non-compromise in any user experience everywhere in the code base. We would
>>>> like to thank everyone in the Beam community to contribute to such an
>>>> amazing framework!
>>>>
>>>> 2) A portable Samza Runner prototype
>>>> We are also starting the work in making Samza Runner portable. So far
>>>> we just got the python word count example working using portable Samza
>>>> Runner. Please look out for the PR for this very soon :). Again, this work
>>>> is not possible without the great Beam portability framework, and the
>>>> developers like Luke and Ahmet, just to name a few, behind it. The
>>>> ReferenceRunner has been extremely useful to us to figure out what's needed
>>>> and how it works. Kudos to Thomas Groh, Ben Sidhom and all the others who
>>>> makes this available to us. And to Kenn, your fuse work rocks.
>>>>
>>>> 3) More contributors in Samza Runner
>>>> The runner has been Chris and my personal project for a while and now
>>>> it's not the case. We got Hai Lu and Boris Shkolnik from Samza team to
>>>> contribute. Hai has been focusing on the portability work as mentioned in
>>>> #2, and Boris will work mostly on supporting our use cases. We will send
>>>> more emails discussing our use cases, like the "Update state after firing"
>>>> email I sent out earlier.
>>>>
>>>> Finally, a shout-out to our very own Chris Pettitt. Without you, none
>>>> of the above won't happen!
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>


flink portable runner usage

2019-01-17 Thread Hai Lu
Hi Thomas,

This is Hai who works on portable runner for Samza. I have a few minor
question that I would like to get clarification on from you.

We chatted briefly at last beam meetup and you mention your flink portable
runner (Python) is going into production. So today are you using Beam
Python on Flink in streaming mode or batch mode? And what are you input
sources (Kafka? Kinesis?)

Also we talked about how bundling would help lift the perf by a lot. But it
seems like flink runner today only does bundling in batch mode, not in
streaming mode. Am I missing something?

BTW, looking forward to the Beam @Lyft meetup in February!

Thanks,
Hai (LinkedIn)


Re: Enable security for data channels in portability

2019-05-16 Thread Hai Lu
Hi Lukasz and Ankur,

Here is the PR that implements the idea:
https://github.com/apache/beam/pull/8597

Would appreciate it if you could take a look.

Thanks,
Hai

On Tue, Apr 30, 2019 at 9:13 AM Hai Lu  wrote:

> One thing to clarify is that we do not use docker. I don't have too much
> experience with docker; I assume docker itself already has network
> isolation, and that's why it was never necessary to enable security in
> portable runner before?
>
> For us because we simply use processes, we need this extra secret (through
> file system) for authentication.
>
> Let me create a ticket and send a PR, which should explain my intention
> better.
>
> Thanks,
> Hai
>
> On Mon, Apr 29, 2019 at 1:03 PM Lukasz Cwik  wrote:
>
>> Changing the address to be loopback based upon how the environment is
>> started (docker container/process/external/...) makes sense.
>>
>> How would the SDK and runner support storing/sharing this secret? (For
>> example, in the docker container, how would the secret get there?)
>>
>> On Mon, Apr 29, 2019 at 9:23 AM Hai Lu  wrote:
>>
>>> Hi Lukasz and Ankur,
>>>
>>> Thank you so much for your response! This is what we're
>>> doing/implementing in our internal fork right now:
>>>
>>>1. We assume that the Java process and Python process *are always
>>>colocated in the same host*, so first of all we use "loopback"
>>>address instead of "any address" that's currently being used on the java
>>>side. That way, the traffic between sdk worker and runner is limited to 
>>> the
>>>host but not exposed to network.
>>>2. Because of the multi-tenant nature of our environment, we still
>>>want to have authentication even for local host, so that data ports are 
>>> not
>>>connected by random processes. Because different jobs have their own user
>>>name, it's sufficient to *use file system to store an ad-hoc secret*,
>>>which can be shared by both Python sdk and java runner. The the runner 
>>> uses
>>>this secret to authenticate the worker (by using gRPC's interceptor for
>>>this customized auth)
>>>3. By having the 2 steps above, we *no longer need transport layer
>>>security *(SSL/TLS). So we abandon our initial plan to enable
>>>SSL/TLS.
>>>
>>> Above is the high level plan that I'm implementing. I would like to have
>>> a similar solution in the open source to be merged with our internal fork.
>>> Let me know what you think. If this sounds OK I will create a ticket for
>>> myself and will first send out a short write-up in google doc to collect
>>> comments soon.
>>>
>>> Thanks,
>>> Hai
>>>
>>> On Fri, Apr 26, 2019 at 5:24 PM Ankur Goenka  wrote:
>>>
>>>> In an offline chat with Hai, It seem useful for users to be able to
>>>> provide custom authentication like a secret which can be distributed out of
>>>> band by the infrastructure and can be provided via file system, rpc to
>>>> another service etc.
>>>> gRPC already has some mechanism for standard and custom
>>>> authentication[1].
>>>> Instrumenting gRPC channel using command line option or environment
>>>> variable on the worker machines can be be useful.
>>>>
>>>> [1] https://grpc.io/docs/guides/auth/
>>>>
>>>> On Fri, Apr 26, 2019 at 4:33 PM Lukasz Cwik  wrote:
>>>>
>>>>> The link to the ApiServiceDescriptor is
>>>>> https://github.com/apache/beam/blob/476e17ed6badd4d5c06c4caf8a824805f40a8e7a/model/pipeline/src/main/proto/endpoints.proto#L31
>>>>>
>>>>> On Fri, Apr 26, 2019 at 4:32 PM Lukasz Cwik  wrote:
>>>>>
>>>>>> I had originally taken a look at this a while ago but not much has
>>>>>> progressed since then. The original idea was that the 
>>>>>> ApiServiceDescriptor
>>>>>> would be extended to support secure ways of 
>>>>>> authentication/communication. I
>>>>>> was prototyping with an OAuth2 client credentials grant at the time but
>>>>>> dropped it as other things were more important. The only currently
>>>>>> supported mode across all SDKs is an implicit authenticated/secure mode
>>>>>> where all communication is assumed to already be encrypted/private (e.g.
>>>>>> over VPN that is managed externally with trusted services) and hence the
>>

Re: Enable security for data channels in portability

2019-04-30 Thread Hai Lu
One thing to clarify is that we do not use docker. I don't have too much
experience with docker; I assume docker itself already has network
isolation, and that's why it was never necessary to enable security in
portable runner before?

For us because we simply use processes, we need this extra secret (through
file system) for authentication.

Let me create a ticket and send a PR, which should explain my intention
better.

Thanks,
Hai

On Mon, Apr 29, 2019 at 1:03 PM Lukasz Cwik  wrote:

> Changing the address to be loopback based upon how the environment is
> started (docker container/process/external/...) makes sense.
>
> How would the SDK and runner support storing/sharing this secret? (For
> example, in the docker container, how would the secret get there?)
>
> On Mon, Apr 29, 2019 at 9:23 AM Hai Lu  wrote:
>
>> Hi Lukasz and Ankur,
>>
>> Thank you so much for your response! This is what we're
>> doing/implementing in our internal fork right now:
>>
>>1. We assume that the Java process and Python process *are always
>>colocated in the same host*, so first of all we use "loopback"
>>address instead of "any address" that's currently being used on the java
>>side. That way, the traffic between sdk worker and runner is limited to 
>> the
>>host but not exposed to network.
>>2. Because of the multi-tenant nature of our environment, we still
>>want to have authentication even for local host, so that data ports are 
>> not
>>connected by random processes. Because different jobs have their own user
>>name, it's sufficient to *use file system to store an ad-hoc secret*,
>>which can be shared by both Python sdk and java runner. The the runner 
>> uses
>>this secret to authenticate the worker (by using gRPC's interceptor for
>>this customized auth)
>>3. By having the 2 steps above, we *no longer need transport layer
>>security *(SSL/TLS). So we abandon our initial plan to enable
>>SSL/TLS.
>>
>> Above is the high level plan that I'm implementing. I would like to have
>> a similar solution in the open source to be merged with our internal fork.
>> Let me know what you think. If this sounds OK I will create a ticket for
>> myself and will first send out a short write-up in google doc to collect
>> comments soon.
>>
>> Thanks,
>> Hai
>>
>> On Fri, Apr 26, 2019 at 5:24 PM Ankur Goenka  wrote:
>>
>>> In an offline chat with Hai, It seem useful for users to be able to
>>> provide custom authentication like a secret which can be distributed out of
>>> band by the infrastructure and can be provided via file system, rpc to
>>> another service etc.
>>> gRPC already has some mechanism for standard and custom
>>> authentication[1].
>>> Instrumenting gRPC channel using command line option or environment
>>> variable on the worker machines can be be useful.
>>>
>>> [1] https://grpc.io/docs/guides/auth/
>>>
>>> On Fri, Apr 26, 2019 at 4:33 PM Lukasz Cwik  wrote:
>>>
>>>> The link to the ApiServiceDescriptor is
>>>> https://github.com/apache/beam/blob/476e17ed6badd4d5c06c4caf8a824805f40a8e7a/model/pipeline/src/main/proto/endpoints.proto#L31
>>>>
>>>> On Fri, Apr 26, 2019 at 4:32 PM Lukasz Cwik  wrote:
>>>>
>>>>> I had originally taken a look at this a while ago but not much has
>>>>> progressed since then. The original idea was that the ApiServiceDescriptor
>>>>> would be extended to support secure ways of authentication/communication. 
>>>>> I
>>>>> was prototyping with an OAuth2 client credentials grant at the time but
>>>>> dropped it as other things were more important. The only currently
>>>>> supported mode across all SDKs is an implicit authenticated/secure mode
>>>>> where all communication is assumed to already be encrypted/private (e.g.
>>>>> over VPN that is managed externally with trusted services) and hence the
>>>>> gRPC channel itself is insecure and there is no authentication being
>>>>> performed.
>>>>>
>>>>> Even though sdk_worker.py seems like it supports credentials, no one
>>>>> invokes the constructor with credentials enabled as can be seen by this
>>>>> comment by Robert[1].
>>>>>
>>>>> For SSL/TLS support it seems like we need some way to configure a
>>>>> runner to be told to use SSL/TLS (potentially with a custom private key 
>>>>> and
>

Re: Enable security for data channels in portability

2019-04-29 Thread Hai Lu
Hi Lukasz and Ankur,

Thank you so much for your response! This is what we're doing/implementing
in our internal fork right now:

   1. We assume that the Java process and Python process *are always
   colocated in the same host*, so first of all we use "loopback" address
   instead of "any address" that's currently being used on the java side. That
   way, the traffic between sdk worker and runner is limited to the host but
   not exposed to network.
   2. Because of the multi-tenant nature of our environment, we still want
   to have authentication even for local host, so that data ports are not
   connected by random processes. Because different jobs have their own user
   name, it's sufficient to *use file system to store an ad-hoc secret*,
   which can be shared by both Python sdk and java runner. The the runner uses
   this secret to authenticate the worker (by using gRPC's interceptor for
   this customized auth)
   3. By having the 2 steps above, we *no longer need transport layer
   security *(SSL/TLS). So we abandon our initial plan to enable SSL/TLS.

Above is the high level plan that I'm implementing. I would like to have a
similar solution in the open source to be merged with our internal fork.
Let me know what you think. If this sounds OK I will create a ticket for
myself and will first send out a short write-up in google doc to collect
comments soon.

Thanks,
Hai

On Fri, Apr 26, 2019 at 5:24 PM Ankur Goenka  wrote:

> In an offline chat with Hai, It seem useful for users to be able to
> provide custom authentication like a secret which can be distributed out of
> band by the infrastructure and can be provided via file system, rpc to
> another service etc.
> gRPC already has some mechanism for standard and custom authentication[1].
> Instrumenting gRPC channel using command line option or environment
> variable on the worker machines can be be useful.
>
> [1] https://grpc.io/docs/guides/auth/
>
> On Fri, Apr 26, 2019 at 4:33 PM Lukasz Cwik  wrote:
>
>> The link to the ApiServiceDescriptor is
>> https://github.com/apache/beam/blob/476e17ed6badd4d5c06c4caf8a824805f40a8e7a/model/pipeline/src/main/proto/endpoints.proto#L31
>>
>> On Fri, Apr 26, 2019 at 4:32 PM Lukasz Cwik  wrote:
>>
>>> I had originally taken a look at this a while ago but not much has
>>> progressed since then. The original idea was that the ApiServiceDescriptor
>>> would be extended to support secure ways of authentication/communication. I
>>> was prototyping with an OAuth2 client credentials grant at the time but
>>> dropped it as other things were more important. The only currently
>>> supported mode across all SDKs is an implicit authenticated/secure mode
>>> where all communication is assumed to already be encrypted/private (e.g.
>>> over VPN that is managed externally with trusted services) and hence the
>>> gRPC channel itself is insecure and there is no authentication being
>>> performed.
>>>
>>> Even though sdk_worker.py seems like it supports credentials, no one
>>> invokes the constructor with credentials enabled as can be seen by this
>>> comment by Robert[1].
>>>
>>> For SSL/TLS support it seems like we need some way to configure a runner
>>> to be told to use SSL/TLS (potentially with a custom private key and trust
>>> chain). Do you have some suggestions on how we add support for passing
>>> around channel/call[2] credentials?
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/476e17ed6badd4d5c06c4caf8a824805f40a8e7a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L139
>>> 2: https://grpc.io/docs/guides/auth/
>>>
>>> On Tue, Apr 23, 2019 at 5:06 PM Hai Lu  wrote:
>>>
>>>> Hi,
>>>>
>>>> This is Hai from LinkedIn. Daniel and I have been working on
>>>> productionizing Samza portable runner. BTW, Daniel didn't mention in his
>>>> previous email that he has enabled and validated Python 3 for Samza runner
>>>> and it worked smoothly. Kudos to the team!
>>>>
>>>> Here I have a few security related questions about portability. At
>>>> LinkedIn, we enable SSL/TLS and ACLs for Kafka data and any data exchange.
>>>> In the case of portable runner, we're required to secure the data channels
>>>> between Java and Python processes as well because our Samza jobs are
>>>> running in a multi-tenant environment. While I'm currently working on this
>>>> on our internal branch, I do want to keep it clean and consistent with the
>>>> master branch.
>>>>
>>>> My questions are: were there any plans/thoughts around security for
>>>> portability? I see that sdk_worker.py does have some codes to create
>>>> secured gRPC channels; is anyone actually leveraging those codes? I don't
>>>> see on the Java side any work is done, though.
>>>>
>>>> Thanks,
>>>> Hai Lu
>>>>
>>>


Enable security for data channels in portability

2019-04-23 Thread Hai Lu
Hi,

This is Hai from LinkedIn. Daniel and I have been working on
productionizing Samza portable runner. BTW, Daniel didn't mention in his
previous email that he has enabled and validated Python 3 for Samza runner
and it worked smoothly. Kudos to the team!

Here I have a few security related questions about portability. At
LinkedIn, we enable SSL/TLS and ACLs for Kafka data and any data exchange.
In the case of portable runner, we're required to secure the data channels
between Java and Python processes as well because our Samza jobs are
running in a multi-tenant environment. While I'm currently working on this
on our internal branch, I do want to keep it clean and consistent with the
master branch.

My questions are: were there any plans/thoughts around security for
portability? I see that sdk_worker.py does have some codes to create
secured gRPC channels; is anyone actually leveraging those codes? I don't
see on the Java side any work is done, though.

Thanks,
Hai Lu


IllegalStateException: TimestampCombiner moved element from to earlier time in Python

2019-08-15 Thread Hai Lu
Hi,

This is Hai from LinkedIn.

I'm looking into a bug I found internally when using Beam portable API
(Python) on our own Samza runner.

The pipeline looks something like this:

(p
 | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
 | 'transform' >> beam.Map(lambda event: process_event(event))
 | 'window' >> beam.WindowInto(FixedWindows(15))
 | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
 ...

The problem comes from the combiners which cause the following exception on
Java side:

Caused by: java.lang.IllegalStateException: TimestampCombiner moved element
from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z
for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
at
org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
at
org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
at
org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
at
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)

The exception happens here
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116
when
we check the shifted timestamp to ensure it's before the timestamp.

if (shifted.isBefore(timestamp)) {
  throw new IllegalStateException(
  String.format(
  "TimestampCombiner moved element from %s to earlier time %s
for window %s",
  BoundedWindow.formatTimestamp(timestamp),
  BoundedWindow.formatTimestamp(shifted),
  window));
}

As you can see from the exception, the "shifted" is "XXX 44.999" while the
"timestamp" is "XXX 45.000". The "44.999" is coming from
TimestampCombiner.END_OF_WINDOW

:

@Override
public Instant merge(BoundedWindow intoWindow, Iterable mergingTimestamps) {
  return intoWindow.maxTimestamp();
}

where intoWindow.maxTimestamp() is:

  /** Returns the largest timestamp that can be included in this window. */
  @Override
  public Instant maxTimestamp() {
*// end not inclusive*
return *end.minus(1)*;
  }

Hence, the "44.*999*".

And the "45.000" comes from the Python side when the combiner output
results as pre GBK operation: operations.py#PGBKCVOperation#output_key


if windows is 0:
  self.output(_globally_windowed_value.with_value((key, value)))
else:
  self.output(WindowedValue((key, value), *windows[0].end*, windows))

Here when we generate the window value, the timestamp is assigned to the
closed interval end (45.000) as opposed to open interval end (44.999)

Clearly the "end of window" definition is a bit inconsistent across Python
and Java. I'm yet to try this on other runner so not sure whether this is
only an issue for our Samza runner. I tend to think this is a bug but would
like to confirm with you. If this has not been an issue for other runners,
where did I potentially do wrong.

Right now I can bypass this issue by directly using GroupByKey (instead of
any combiners) and do reducing on my own. But it would be much more
convenient for us to use combiners.

Any advice would be extremely helpful. Thank you in advance!

-Hai


Re: IllegalStateException: TimestampCombiner moved element from to earlier time in Python

2019-08-16 Thread Hai Lu
I did a simple fix for this issue here:
https://github.com/apache/beam/pull/9364

Tested locally and it fixes the problem. Can someone help take a look?

Thanks,
Hai

On Thu, Aug 15, 2019 at 9:16 AM Hai Lu  wrote:

> Hi,
>
> This is Hai from LinkedIn.
>
> I'm looking into a bug I found internally when using Beam portable API
> (Python) on our own Samza runner.
>
> The pipeline looks something like this:
>
> (p
>  | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>  | 'transform' >> beam.Map(lambda event: process_event(event))
>  | 'window' >> beam.WindowInto(FixedWindows(15))
>  | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>  ...
>
> The problem comes from the combiners which cause the following exception
> on Java side:
>
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved
> element from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:
> *44.999*Z for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*
> Z)
> at
> org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
> at
> org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
> at
> org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
> The exception happens here
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116
>  when
> we check the shifted timestamp to ensure it's before the timestamp.
>
> if (shifted.isBefore(timestamp)) {
>   throw new IllegalStateException(
>   String.format(
>   "TimestampCombiner moved element from %s to earlier time %s
> for window %s",
>   BoundedWindow.formatTimestamp(timestamp),
>   BoundedWindow.formatTimestamp(shifted),
>   window));
> }
>
> As you can see from the exception, the "shifted" is "XXX 44.999" while the
> "timestamp" is "XXX 45.000". The "44.999" is coming from
> TimestampCombiner.END_OF_WINDOW
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116>
> :
>
> @Override
> public Instant merge(BoundedWindow intoWindow, Iterable Instant> mergingTimestamps) {
>   return intoWindow.maxTimestamp();
> }
>
> where intoWindow.maxTimestamp() is:
>
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
> *// end not inclusive*
> return *end.minus(1)*;
>   }
>
> Hence, the "44.*999*".
>
> And the "45.000" comes from the Python side when the combiner output
> results as pre GBK operation: operations.py#PGBKCVOperation#output_key
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889>
>
> if windows is 0:
>   self.output(_globally_windowed_value.with_value((key, value)))
> else:
>   self.output(WindowedValue((key, value), *windows[0].end*, windows))
>
> Here when we generate the window value, the timestamp is assigned to the
> closed interval end (45.000) as opposed to open interval end (44.999)
>
> Clearly the "end of window" definition is a bit inconsistent across Python
> and Java. I'm yet to try this on other runner so not sure whether this is
> only an issue for our Samza runner. I tend to think this is a bug but would
> like to confirm with you. If this has not been an issue for other runners,
> where did I potentially do wrong.
>
> Right now I can bypass this issue by directly using GroupByKey (instead of
> any combiners) and do reducing on my own. But it would be much more
> convenient for us to use combiners.
>
> Any advice would be extremely helpful. Thank you in advance!
>
> -Hai
>


Embedding expansion service for cross language in the runner

2019-11-04 Thread Hai Lu
Hi,

We're looking into leveraging the cross language pipeline feature in our
Beam pipelines on Samza runner. While the feature seems to work well, the
PTransform expansion as a standalone service isn't very convenient.
Particularly that the Python pipeline needs to specify the address of the
expansion service.

I'm wondering why we couldn't embed the expansion service into runner
itself. I understand the cross language feature wants to be runner
independent, but does it make sense to at least provide the option to allow
runner to use the expansion service as a library and make it transparent to
the portable pipeline?

Thanks,
Hai


Re: Embedding expansion service for cross language in the runner

2019-11-05 Thread Hai Lu
Starting the expansion service in the job server is helpful. But having to
expose the port number and to include the address in
the beam.ExternalTransform is still a hassle. Giving a hard-coded port
number might be the only solution right now but it's not a very clean
solution in our case.

@Robert Bradshaw, yes, one cannot construct the "whole" pipeline first and
pass it to the runner, but can't we easily combine the job server and
expansion service? Also it seems like right now each beam.ExternalTransform
is specifying an expansion service address, does it mean we expect multiple
expansion services? Is there such use case?

Thanks for everyone's response, BTW. This is very helpful! :)

Thanks,
Hai

On Mon, Nov 4, 2019 at 2:23 PM Robert Bradshaw  wrote:

> To clarify, starting up the Flink Job Server by default starts up an
> Expansion Service on the hard-coded, default port 8097.
>
> On Mon, Nov 4, 2019 at 2:02 PM Thomas Weise  wrote:
> >
> > The expansion service can be provided by the job server, as done in the
> Flink runner. It needs to be available at pipeline construction time, but
> there is no need to run a separate service.
> >
> > Thomas
> >
> > On Mon, Nov 4, 2019 at 12:03 PM Robert Bradshaw 
> wrote:
> >>
> >> On Mon, Nov 4, 2019 at 11:54 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >> >
> >> > On Mon, Nov 4, 2019 at 11:01 AM Hai Lu  wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> We're looking into leveraging the cross language pipeline feature in
> our Beam pipelines on Samza runner. While the feature seems to work well,
> the PTransform expansion as a standalone service isn't very convenient.
> Particularly that the Python pipeline needs to specify the address of the
> expansion service.
> >> >>
> >> >> I'm wondering why we couldn't embed the expansion service into
> runner itself. I understand the cross language feature wants to be runner
> independent, but does it make sense to at least provide the option to allow
> runner to use the expansion service as a library and make it transparent to
> the portable pipeline?
> >> >
> >> >
> >> > Beam composite transforms are expanded before defining the portable
> job definition (and before submitting the jobs to the runner). So naturally
> this is something that has to be done in the Beam side. As an added
> benefit, as you identified, this allows us to keep this logic runner
> independent.
> >> > I think there were discussions regarding automatically starting up a
> local expansion service if one is not specified. Will this address your
> concerns ?
> >>
> >> Just to add to this, If you have a pipeline A -> B -> C, the expansion
> >> of B often needs to be evaluated before C can be applied (e.g. we're
> >> planning on exposing the SQL transforms cross language, and many
> >> cross-language IOs can query and supply their own schemas for
> >> downstream type checking), so one cannot construct the "whole"
> >> pipeline, pass it to the runner, and let the runner do the expansion.
>


Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-24 Thread Hai Lu
Hi Robert,

We're trying out iBeam at LinkedIn for Python. As Igor mentioned, there
seems to be some inconsistency in the behavior of interactive beam. We can
suggest some fixes from our end but we would need some support from the
community.

Also, is there a plan to support iBeam for streaming mode? We're interested
in that use case as well.

Thanks,
Hai

On Mon, Oct 21, 2019 at 4:45 PM Robert Bradshaw  wrote:

> Thanks for trying this out. Yes, this is definitely something that
> should be supported (and tested).
>
> On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
> >
> > Hi everyone,
> >
> > The interactive beam example using the DirectRunner fails after
> execution of the last cell. The recursion limit is exceeded during the
> calculation of the cache label because of a circular reference in the
> PipelineInfo object.
> >
> > The constructor for the PipelineInfo class creates a mapping from each
> pcollection to the transforms that produce and consume it. The issue arises
> when there exists a transform that is both a producer and a consumer for
> the same pcollection. This occurs when a transform's expand method returns
> the same pcoll object that's passed into it. The specific transform causing
> the failure of the example is MaybeReshuffle, which is used in the Create
> transform. Replacing "return pcoll" with "return pcoll | Map(lambda x: x)"
> seems to fix the problem.
> >
> > A workaround for this issue on the interactive beam side would be fairly
> simple, but it seems to me that there should be more validation of
> pipelines to prevent the use of transforms that return the same pcoll
> that's passed in, or at least a mention of this in the transform style
> guide. My understanding is that pcollections are produced by a single
> transform (they even have a field called "producer" that references only
> one transform). If that's the case then that property of pcollections
> should be enforced.
> >
> > I made ticket BEAM-8451 to track this issue.
> >
> > I'm still new to beam so I apologize if I'm fundamentally
> misunderstanding something. I'm not exactly sure what the next step should
> be and would appreciate some recommendations. I can submit a PR to solve
> the immediate problem of the failing example but the underlying problem
> should also be addressed at some point. I also apologize if people are
> already aware of this problem.
> >
> > Thank You!
> > Igor Durovic
>