Re: Python SDK Arrow Integrations

2019-03-28 Thread Kenneth Knowles
On Thu, Mar 28, 2019 at 12:24 PM Brian Hulette  wrote:

> > - Presumably there is a pandas counterpart in Java. Is there? Do you
> know?
> I think there are some dataframe libraries in Java we could look into. I'm
> not aware of anything that has the same popularity and arrow integration as
> pandas though. Within the arrow project there is Gandiva [1], which has
> Java bindings. It generates optimized LLVM code for processing arrow data
> based on an expression tree. I think that could be a valuable tool for SQL.
>

Gandiva looks to be similar to what we get from Calcite today, but I wonder
if it is higher performance due to being lower level or more flexible (for
example Calcite's codegen is pretty hardcoded to millisecond precision
datetimes). Worth learning about. Since another big benefit of Calcite's
expression compiler is implementation of "all" builtin functions for free,
I'd look closely at how to provide a builtin function catalog to Gandiva.

> - Is it valuable for Beam to invent its own schemas? I'd love for Beam to
> have identical schema affordances to either protobuf or arrow or avro, with
> everything layered on that as logical types (including SQL types). What
> would it look like if Beam schemas were more-or-less Arrow schemas?
> As it stands right now there is a very clear mapping from Beam schemas to
> Arrow schemas. Both define similar primitive types, as well as nested types
> like row (beam) -> struct (arrow), array (beam) -> list (arrow). In
> addition Arrow schemas have a binary representation and implementations in
> many languages.
>
> I had some offline discussion with Reuven about this - and he pointed out
> that eventually we'd like Beam schemas to have a type for large iterables
> as well, so that even a PCollection>> can have a schema,
> and that's certainly a concept that wouldn't make sense for Arrow. So I
> think the answer is yes it is valuable for Beam to have its own schemas -
> that way we can represent Beam-only concepts, but still be able to map to
> other schemas when it makes sense (For example in the KV>
> case we could map V's beam schema to an arrow schema and encode it as arrow
> record batches).
>

This convinces me that Beam should have its own schema definition. There
are things in Beam - and could be novelties created in Beam - that might
not fit Arrow. And we don't want to have such a tight coupling. If the
mapping is straightforward enough then there's not that much work to just
convert to/from. But the piece I would think about it is that any change to
Beam or Arrow could introduce something that doesn't translate well, so we
just need to be cognizant of that.

Kenn


>
> Brian
>
> [1] http://arrow.apache.org/blog/2018/12/05/gandiva-donation/
>
> On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles  wrote:
>
>> Thinking about Arrow + Beam SQL + schemas:
>>
>>  - Obviously many SQL operations could be usefully accelerated by arrow /
>> columnar. Especially in the analytical realm this is the new normal. For
>> ETL, perhaps less so.
>>
>>  - Beam SQL planner (pipeline construction) is implemented in Java, and
>> so the various DoFns/CombineFns that implement projection, filter, etc, are
>> also in Java.
>> - Arrow is of course available in Java.
>> - Presumably there is a pandas counterpart in Java. Is there? Do you
>> know?
>> - But perhaps if these building blocks emitted by the planner had
>> well-defined URNs we could use SIMD+columnar Java or Python implementation
>> opportunistically to avoid cross-language data channels. (thinking ahead to
>> when cross-language allows Python pipelines to invoke the construction-time
>> planner implemented in Java)
>>
>>  - Is it valuable for Beam to invent its own schemas? I'd love for Beam
>> to have identical schema affordances to either protobuf or arrow or avro,
>> with everything layered on that as logical types (including SQL types).
>> What would it look like if Beam schemas were more-or-less Arrow schemas?
>>
>>  - For the event timestamp issue, there are two levels of abstraction I
>> could imagine improvements:
>> - at the model layer (aka portability protos) we could make Beam
>> columnar batch aware. That's a huge change and would need a massive
>> justification IMO in the form of performance numbers.
>> - at the SDK layer, some language might make it pretty easy to
>> overload the "GroupByKey" transform to understand that for elements that
>> are really batches there are multiple timestamps contained within so it may
>> need to window & group differently. The model doesn't need to know in this
>> case.
>>
>> Kenn
>>
>> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay  wrote:
>>
>>> Thank you Brian, this looks promising.
>>>
>>> cc: +Chamikara Jayalath  +Heejong Lee
>>> 
>>>
>>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette 
>>> wrote:
>>>
 Hi everyone,
 I've been doing some investigations into how Arrow might fit into Beam
 as a way to ramp up on the project. As I've gone about 

JDBCIO Connection Pooling

2019-03-28 Thread hgu2hw+2g0aed6fdoszs
Hello, I have recently created a streaming google dataflow program with apache 
beam using the java SDK. When files land in cloud-storage they fire off pubsub 
messages with the filename, which I consume and then write to a cloud sql 
database. Everything works great for the most part. However I've been testing 
it more thoroughly recently and noticed that if I start reading in multiple 
files that database connections slowly grow  and grow until they hit the 
default limit of 100 connections. Strangely the idle connections never seem to 
disappear and the program might run for hours watching for pubsub messages so 
this creates a problem. 

My initial idea was to create a c3p0 connection pool and pass that in as the 
datasource through the JdbcIO.DataSourceConfiguration.create method. I noticed 
this didn't seem to make a difference which perplexed me even with my 
aggressive pool connections. After some debugging I noticed that the datasource 
was still being wrapped in a pooling datasource..even through it already is a 
pooled datasource. I was wondering what strangeness this caused, so locally I 
hacked JdbcIO to just return my c3p0 datasource and do nothing else in the 
buildDatasource method ( 
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
 - line 331). It seemed to alleviate the connection problems and now I see the 
idle connections slowly start disappearing in cloud sql. Everything appears to 
be working smoothly. Obviously this isn't the solution I want moving forward. 
Is there some other way to achieve this? What grave mistakes have I done by 
bypassing the standard way of doing it?






Sent using Guerrillamail.com
Block or report abuse: 
https://www.guerrillamail.com//abuse/?a=RURiJQ8FQrlbiR6183YaPBvVSg%3D%3D




Re: Unexpected TestStream behavior when testing stateful DoFn

2019-03-28 Thread Ahmet Altay
I agree it looks like a bug. Filed
https://issues.apache.org/jira/browse/BEAM-6934



On Thu, Mar 28, 2019 at 5:18 PM Reuven Lax  wrote:

> This looks like a bug to me.
>
> On Thu, Mar 28, 2019 at 2:52 PM Amar Pai  wrote:
>
>> Hi,
>>
>> I'm running into some unexpected behavior when trying to unit test a
>> stateful DoFn that uses watermark timers as well as bag state.  I'm
>> following the example here:
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L333
>>
>> Expected behavior:
>> When using TestStream, if a stateful DoFn adds value 'foo' to BagState
>> then sets two watermark timers t1 and t2, where t1 clears the bag state and
>> t2 reads from bag state and emits the contents, if t1.time < t2.time then
>> nothing should get emitted when the TestPipeline is run. (bag state should
>> be cleared by timer at t1 before it is read from by timer at t2)
>>
>> Actual behavior:
>> For the scenario described above, results get emitted despite t1.time
>> being less than t2.time.
>> I've created a gist with a demonstration of the problem:
>> https://gist.github.com/jcruelty/3bf5ce5865110372a2d1650b1421cde1
>>
>>
>> Is this a bug?  Would appreciate your thoughts.
>> cheers
>>
>> Amar
>>
>


Re: Python SDK Arrow Integrations

2019-03-28 Thread Ahmet Altay
On Thu, Mar 28, 2019 at 12:24 PM Brian Hulette  wrote:

> > I think splitting to new transforms rather that adding new options to
> existing IO transforms would be simpler for users.  I think this would be a
> question that could be easier to answer with a PR.
> Ok I'll start working on one :)
>
> > How did you measure this? It would be good for us to also have relevant
> micro benchmarks here.
> I ran some one-off benchmarks comparing
> `pickle.loads(pickle.dumps(batch))` and `decode(encode(batch))`, where
> `decode` and `encode` use the arrow IPC formats. I can formalize this and
> publish the results somewhere. Would it be sufficient to just make a
> personal GitHub repo and push the code/results there?
>

We have a few micro benchmarks here (
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/tools).
Feel free to add them there.


>
>
> > - Presumably there is a pandas counterpart in Java. Is there? Do you
> know?
> I think there are some dataframe libraries in Java we could look into. I'm
> not aware of anything that has the same popularity and arrow integration as
> pandas though. Within the arrow project there is Gandiva [1], which has
> Java bindings. It generates optimized LLVM code for processing arrow data
> based on an expression tree. I think that could be a valuable tool for SQL.
>
>
> > - Is it valuable for Beam to invent its own schemas? I'd love for Beam
> to have identical schema affordances to either protobuf or arrow or avro,
> with everything layered on that as logical types (including SQL types).
> What would it look like if Beam schemas were more-or-less Arrow schemas?
> As it stands right now there is a very clear mapping from Beam schemas to
> Arrow schemas. Both define similar primitive types, as well as nested types
> like row (beam) -> struct (arrow), array (beam) -> list (arrow). In
> addition Arrow schemas have a binary representation and implementations in
> many languages.
>
> I had some offline discussion with Reuven about this - and he pointed out
> that eventually we'd like Beam schemas to have a type for large iterables
> as well, so that even a PCollection>> can have a schema,
> and that's certainly a concept that wouldn't make sense for Arrow. So I
> think the answer is yes it is valuable for Beam to have its own schemas -
> that way we can represent Beam-only concepts, but still be able to map to
> other schemas when it makes sense (For example in the KV>
> case we could map V's beam schema to an arrow schema and encode it as arrow
> record batches).
>
> Brian
>
> [1] http://arrow.apache.org/blog/2018/12/05/gandiva-donation/
>
> On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles  wrote:
>
>> Thinking about Arrow + Beam SQL + schemas:
>>
>>  - Obviously many SQL operations could be usefully accelerated by arrow /
>> columnar. Especially in the analytical realm this is the new normal. For
>> ETL, perhaps less so.
>>
>>  - Beam SQL planner (pipeline construction) is implemented in Java, and
>> so the various DoFns/CombineFns that implement projection, filter, etc, are
>> also in Java.
>> - Arrow is of course available in Java.
>> - Presumably there is a pandas counterpart in Java. Is there? Do you
>> know?
>> - But perhaps if these building blocks emitted by the planner had
>> well-defined URNs we could use SIMD+columnar Java or Python implementation
>> opportunistically to avoid cross-language data channels. (thinking ahead to
>> when cross-language allows Python pipelines to invoke the construction-time
>> planner implemented in Java)
>>
>>  - Is it valuable for Beam to invent its own schemas? I'd love for Beam
>> to have identical schema affordances to either protobuf or arrow or avro,
>> with everything layered on that as logical types (including SQL types).
>> What would it look like if Beam schemas were more-or-less Arrow schemas?
>>
>>  - For the event timestamp issue, there are two levels of abstraction I
>> could imagine improvements:
>> - at the model layer (aka portability protos) we could make Beam
>> columnar batch aware. That's a huge change and would need a massive
>> justification IMO in the form of performance numbers.
>> - at the SDK layer, some language might make it pretty easy to
>> overload the "GroupByKey" transform to understand that for elements that
>> are really batches there are multiple timestamps contained within so it may
>> need to window & group differently. The model doesn't need to know in this
>> case.
>>
>> Kenn
>>
>> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay  wrote:
>>
>>> Thank you Brian, this looks promising.
>>>
>>> cc: +Chamikara Jayalath  +Heejong Lee
>>> 
>>>
>>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette 
>>> wrote:
>>>
 Hi everyone,
 I've been doing some investigations into how Arrow might fit into Beam
 as a way to ramp up on the project. As I've gone about this I've prototyped
 a couple of additions to the Python SDK. I think these additions may be
 useful for 

Re: Unexpected TestStream behavior when testing stateful DoFn

2019-03-28 Thread Reuven Lax
This looks like a bug to me.

On Thu, Mar 28, 2019 at 2:52 PM Amar Pai  wrote:

> Hi,
>
> I'm running into some unexpected behavior when trying to unit test a
> stateful DoFn that uses watermark timers as well as bag state.  I'm
> following the example here:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L333
>
> Expected behavior:
> When using TestStream, if a stateful DoFn adds value 'foo' to BagState
> then sets two watermark timers t1 and t2, where t1 clears the bag state and
> t2 reads from bag state and emits the contents, if t1.time < t2.time then
> nothing should get emitted when the TestPipeline is run. (bag state should
> be cleared by timer at t1 before it is read from by timer at t2)
>
> Actual behavior:
> For the scenario described above, results get emitted despite t1.time
> being less than t2.time.
> I've created a gist with a demonstration of the problem:
> https://gist.github.com/jcruelty/3bf5ce5865110372a2d1650b1421cde1
>
>
> Is this a bug?  Would appreciate your thoughts.
> cheers
>
> Amar
>


Unexpected TestStream behavior when testing stateful DoFn

2019-03-28 Thread Amar Pai
Hi,

I'm running into some unexpected behavior when trying to unit test a
stateful DoFn that uses watermark timers as well as bag state.  I'm
following the example here:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L333

Expected behavior:
When using TestStream, if a stateful DoFn adds value 'foo' to BagState then
sets two watermark timers t1 and t2, where t1 clears the bag state and t2
reads from bag state and emits the contents, if t1.time < t2.time then
nothing should get emitted when the TestPipeline is run. (bag state should
be cleared by timer at t1 before it is read from by timer at t2)

Actual behavior:
For the scenario described above, results get emitted despite t1.time being
less than t2.time.
I've created a gist with a demonstration of the problem:
https://gist.github.com/jcruelty/3bf5ce5865110372a2d1650b1421cde1


Is this a bug?  Would appreciate your thoughts.
cheers

Amar


Re: Debugging :beam-sdks-java-io-hadoop-input-format:test

2019-03-28 Thread Mikhail Gryzykhin
I've seen it couple of times already and just got another repro:
https://builds.apache.org/job/beam_PreCommit_Java_Commit/5011/consoleFull

On Thu, Mar 28, 2019 at 8:55 AM Alexey Romanenko 
wrote:

> Hi Mikhail,
>
> We had a flaky “HIFIOWithEmbeddedCassandraTest” a while ago and it was
> caused by issue with launching of embedded Cassandra cluster. Then it was
> fixed by Etienne Chauchot's PR [1]
> Though, I don’t see any similar error messages in your Jenkins job log,
> so, I’m not sure it’s the same issue.
>
> Have you seen this fail only once or several times already?
>
> [1] https://github.com/apache/beam/pull/8000
>
> On 27 Mar 2019, at 22:24, Mikhail Gryzykhin  wrote:
>
> Hi everyone,
>
> I have a pre-commit job that fails on
> *:beam-sdks-java-io-hadoop-input-format:test*
> .
> Relevant PR. 
>
> Target doesn't have any explicit log associated with it. Running same
> target in local doesn't give me much help. It seem to fail somewhere in
> native runtime.
>
> Can someone help with tackling this issue?
>
> Regards,
> Mikhail.
>
>
>
>


GSoC Project Proposal

2019-03-28 Thread Tanay Tummalapalli
Hi everyone,

I am applying for Google Summer of Code with Apache Beam this year.
I have chosen BEAM-6611  -
"A Python Sink for BigQuery with File Loads in Streaming" as my project.
The draft of my proposal

is
ready. I would love to get feedback and criticism from you so I can submit
the best possible proposal.

PFA my project proposal.

Thank You
Tanay Tummalapalli
LinkedIn  | GitHub



 GSoC Project proposal - T. Tanay



Re: Python SDK Arrow Integrations

2019-03-28 Thread Chamikara Jayalath
On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles  wrote:

> Thinking about Arrow + Beam SQL + schemas:
>
>  - Obviously many SQL operations could be usefully accelerated by arrow /
> columnar. Especially in the analytical realm this is the new normal. For
> ETL, perhaps less so.
>
>  - Beam SQL planner (pipeline construction) is implemented in Java, and so
> the various DoFns/CombineFns that implement projection, filter, etc, are
> also in Java.
> - Arrow is of course available in Java.
> - Presumably there is a pandas counterpart in Java. Is there? Do you
> know?
> - But perhaps if these building blocks emitted by the planner had
> well-defined URNs we could use SIMD+columnar Java or Python implementation
> opportunistically to avoid cross-language data channels. (thinking ahead to
> when cross-language allows Python pipelines to invoke the construction-time
> planner implemented in Java)
>

I think cross-language data channels can be avoided as long as steps
(constructed by the planner) can be fused by a runner without any
interleaving Python steps, irrespective of the data format. When ever a
cross-language boundary is met, we'll have to use a coder that is
compatible across languages (avro, arrow, proto, etc). Hopefully Beam
schemas will make this simpler by introducing compatible Row types in each
language.


>
>  - Is it valuable for Beam to invent its own schemas? I'd love for Beam to
> have identical schema affordances to either protobuf or arrow or avro, with
> everything layered on that as logical types (including SQL types). What
> would it look like if Beam schemas were more-or-less Arrow schemas?
>
>  - For the event timestamp issue, there are two levels of abstraction I
> could imagine improvements:
> - at the model layer (aka portability protos) we could make Beam
> columnar batch aware. That's a huge change and would need a massive
> justification IMO in the form of performance numbers.
> - at the SDK layer, some language might make it pretty easy to
> overload the "GroupByKey" transform to understand that for elements that
> are really batches there are multiple timestamps contained within so it may
> need to window & group differently. The model doesn't need to know in this
> case.
>
> Kenn
>
> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay  wrote:
>
>> Thank you Brian, this looks promising.
>>
>> cc: +Chamikara Jayalath  +Heejong Lee
>> 
>>
>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette 
>> wrote:
>>
>>> Hi everyone,
>>> I've been doing some investigations into how Arrow might fit into Beam
>>> as a way to ramp up on the project. As I've gone about this I've prototyped
>>> a couple of additions to the Python SDK. I think these additions may be
>>> useful for others so I'm considering cleaning them up and submitting PRs,
>>> but I wanted to have a discussion here to see if it makes sense first.
>>>
>>> Note that the approach I'm using for this work right now is very naive.
>>> I've just built pipelines where individual elements are actually arrow
>>> record batches (or pandas DataFrames). This is really only acceptable for
>>> bounded pipelines without windowing, since it's impossible to define a
>>> single event time for each element. That being said, I think these tools
>>> could still be useful for people who want to run batch pipelines using
>>> parquet, arrow, and pandas.
>>>
>>
>> I agree these will be generally useful.
>>
>>
>>>
>>> Here's what I've implemented so far:
>>> # An option for io.ReadFromParquet to yield arrow record batches instead
>>> of individual elements
>>> Currently the python SDK's parquet reader uses pyarrow.parquet to read
>>> parquet row groups into arrow record batches, and then splits the batches
>>> into a single dictionary per row [1]. I've added a flag to optionally
>>> short-circuit this and just yield the arrow record batches directly, making
>>> it easier for me to build pipelines that process columnar batches. If I
>>> were to contribute this change I could also split out the record batch <->
>>> dictionary conversions as separate transforms, since they could be
>>> generally useful as well.
>>>
>>
>> I think splitting to new transforms rather that adding new options to
>> existing IO transforms would be simpler for users.  I think this would be a
>> question that could be easier to answer with a PR.
>>
>
+1. I think it's better to introduce a new transform as well. It'll be
confusing to have an option that changes the type of PCollection returned
for the same transform.


>
>>
>>> # Custom coders for Arrow Tables and Record Batches
>>> I found that the default coder (pickle/dill?) slowed down my arrow
>>> pipelines, particularly in the case where a record batch had been sliced
>>> into smaller record batches (presumably because the entire original batch
>>> is getting serialized for each slice). I put together some coders that
>>> encode arrow tables and record batches with arrow's IPC formats, which
>>> improves performance 

Re: [PROPOSAL] Standardize Gradle structure in Python SDK

2019-03-28 Thread Mark Liu
Thank you Ahmet. Answer your questions below:


> - Could you comment on what kind of parallelization we will gain by this?
> In terms of real numbers, how would this affect build and test times?


The proposal is based on Gradle parallel execution
: "you can force
Gradle to execute tasks in parallel as long as those tasks are in different
projects". In Beam, project is declared per build.gradle file and
registered in settings.gradle
. Tasks that
are included in single Gradle execution will run in parallel only if they
are declared in separate build.gradle files.

An example of applying parallel is beam_PreCommit_Python
 job which runs
:pythonPreCommit
 task that
contains tasks distributed in 4 build.gradle. The execution graph looks
like https://scans.gradle.com/s/4frpmto6o7hto/timeline:
[image: image.png]
Without this proposal, all tasks will run in sequential which can be ~2x
longer. If more py36 and py37 tests added in the future, things will be
even worse.

- I am guessing this will reduce complexity. Is it possible to quantify the
> improvement related to this?


The general code complexity of function/method/property may not change here
since we basically group tasks in a different way without changing inside
logic. I don't know if there is any tool to measure Gradle build
complexity. Would love to try if there is.


> - Beyond the proposal, I am assuming you are willing to work on. Just want
> to clarify this. In either case, would you need help?


Yes, I'd love to take on major refactor works. At the same time, I'll
create jira for each kind of tests (like flink/protable/hdfs tests) in
sdks/python/build.gradle to move into test-suites. Test owners or anyone
interested to this work are welcome to contribute!

Mark

On Wed, Mar 27, 2019 at 3:53 PM Ahmet Altay  wrote:

> This sounds good to me. Thank you for doing this. Few questions:
> - Could you comment on what kind of parallelization we will gain by this?
> In terms of real numbers, how would this affect build and test times?
> - I am guessing this will reduce complexity. Is it possible to quantify
> the improvement related to this?
> - Beyond the proposal, I am assuming you are willing to work on. Just want
> to clarify this. In either case, would you need help?
>
> Thank you,
> Ahmet
>
> On Wed, Mar 27, 2019 at 10:19 AM Mark Liu  wrote:
>
>> Hi Python SDK Developers,
>>
>> You may notice that Gradle files changed a lot recently as
>> parallelization
>>  applied to
>> Python tests and more python versions were enabled in testing. There are
>> tricks over the build scripts and tests are grown naturally and distributed
>> under sdks/python, which caused frictions (like rollback PR-8059
>> ).
>>
>> Thus, I created BEAM-6907
>>  and would like to
>> initiate some works to cleanup and standardize Gradle structure in Python
>> SDK. In general, I think we want to:
>>
>> - Apply parallel execution
>> - Share common tasks
>> - Centralize test related tasks
>> - Have a clear Gradle structure for projects/tasks
>>
>> This is Gradle directory structure I proposed:
>>
>> sdks/python/
>>
>> build.gradle--> hold builds, snapshot, analytic tasks
>> test-suites/--> all pre/post/VR test suites under here
>>
>> README.md
>>
>> dataflow/--> grouped by runner or unit test (tox)
>>
>> py27/--> grouped by py version
>>
>> build.gradle
>>
>> py35/
>>
>> ...
>>
>> direct/
>>
>> py27/
>>
>> ...
>>
>> flink/
>>
>> tox/
>> ...
>>
>>
>> The ideas are:
>> - Only keep builds, snapshot and analytic jobs in sdks/python/build.gradle
>> - Move all test related tasks to sdks/python/test-suites/
>> - In sdks/python/test-suites, we first group by runners, unit test or
>> other testing that can't fit to them, and then group by py versions if
>> needed.
>> - An example of ../test-suites/../py35/build.gradle is this
>> 
>> .
>>
>> Please feel free to explore existing Gradle scripts in Python SDK and
>> bring any thoughts on this proposal if you have.
>>
>> Thanks!
>> Mark
>>
>


Re: Deprecating Avro for fastavro on Python 3

2019-03-28 Thread Ahmet Altay
Hi Ismaël,

It is great to hear that Avro is planning to make a release soon.

To answer your concerns, fastavro has a set of tests using regular avro
files[1] and it also has a large set of users (with 675470 package
downloads). This is in addition to it being a py2 & py3 compatible package
and offering ~7x performance improvements [2]. Another data point, we were
testing fastavro for a while behind an experimental flag and have not seen
issues related compatibility.

pyavro-rs sounds promising however I could not find a released version of
it on pypi. The source code does not look like being maintained either with
last commit on Jul 2, 2018. (for comparison last change on fastavro was on
Mar 19, 2019).

I think given the state of things, it makes sense to switch to fastavro as
the default implementation to unblock python 3 changes. When avro offers a
similar level of performance we could switch back without any visible user
impact.

Ahmet

[1] https://github.com/fastavro/fastavro/tree/master/tests
[2] https://pypi.org/project/fastavro/

On Thu, Mar 28, 2019 at 7:53 AM Ismaël Mejía  wrote:

> Hello,
>
> The problem of switching implementations is the risk of losing
> interoperability, and this is more important than performance. Does
> fastavro have tests that guarantee that it is fully compatible with
> Avro’s Java version? (given that it is the de-facto implementation
> used everywhere).
>
> If performance is a more important criteria maybe it is worth to check
> at pyavro-rs [1], you can take a look at its performance in the great
> talk of last year [2].
>
> I have been involved actively in the Avro community in the last months
> and I am now a committer there. Also Dan Kulp who has done multiple
> contributions in Beam is now a PMC member too. We are at this point
> working hard to get the next release of Avro out, actually the branch
> cut of Avro 1.9.0 is happening this week, and we plan to improve the
> release cadence. Please understand that the issue with Avro is that it
> is a really specific and ‘old‘ project (~10 years) so part of the
> active moved to other areas because it is stable, but we are still
> there working on it and we are eager to improve it for everyone’s
> needs (and of course Beam needs).
>
> I know that Python 3’s Avro implementation is still lacking and could
> be improved (views expressed here are clearly valid), but maybe this
> is a chance to contribute there too. Remember Apache projects are a
> family and we have a history of cross colaboration with other
> communities e.g. Flink, Calcite so why not give it a chance to Avro
> too.
>
> Regards,
> Ismaël
>
> [1] https://github.com/flavray/pyavro-rs
> [2]
> https://ep2018.europython.eu/media/conference/slides/how-to-write-rust-instead-of-c-and-get-away-with-it-yes-its-a-python-talk.pdf
>
> On Wed, Mar 27, 2019 at 11:42 PM Chamikara Jayalath
>  wrote:
> >
> > +1 for making use_fastavro the default for Python3. I don't see any
> significant drawbacks in doing this from Beam's point of view. One concern
> is whether avro and fastavro can safely co-exist in the same environment so
> that Beam continues to work for users who already have avro library
> installed.
> >
> > Note that there are two use_fastavro flags (confusingly enough).
> > (1) for avro file source [1]
> > (2) an experiment flag [2] with the same name that makes Dataflow runner
> use fastavro library for reading/writing intermediate files and for reading
> Avro files exported by BigQuery.
> >
> > I can help with the latter.
> >
> > [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/avroio.py#L81
> > [2]
> https://lists.apache.org/thread.html/94bd362a3a041654e6ef9003fb3fa797e25274fdb4766065481a0796@%3Cuser.beam.apache.org%3E
> >
> > Thanks,
> > Cham
> >
> > On Wed, Mar 27, 2019 at 3:27 PM Valentyn Tymofieiev 
> wrote:
> >>
> >> Thanks, Robbe and Frederik, for raising this.
> >>
> >> Over the course of making Beam Python 3 compatible this is at least the
> second time [1] we have to deal with an error in avro-python3 package. The
> release cadence of Apache Avro (1 release a year)
> >> is concerning to me [2]. Even if we have a new release with Python 3
> fixes soon, as Beam users start use Beam more actively on Python 3, we may
> encounter more issues in avro-python3. If this happens, Beam will have to
> monkey-patch its way around the avro-python3 issues, because waiting for
> next Avro release may not be practical.
> >>
> >> So, I agree that it is be a good time to start transitioning off of
> avro/avro-python3 dependency, given that fastavro is known to be a faster
> alternative [3], and is released monthly[4]
> >>
> >> There are couple of ways to make this transition depending on how
> careful we want to be. We should:
> >>
> >> 1. Remove the dependency on avro in the current codepath whenever
> fastavro is used, as you propose.
> >> 2. Remove Beam dependency on avro-python3 now,  OR,  if we want to be
> safer,  set use_fastavro=True 

Re: [PROPOSAL] Preparing for Beam 2.12.0 release

2019-03-28 Thread Andrew Pilloud
It seems like there was some confusion around when the branch cut was going
to happen. I cut the branch yesterday, but a dozen release blocking fixes
went in immediately after. I recut the branch today, one day late, at
1d9daf1

.

There are currently 3 release blocking issues
. I plan to
cut the first RC on March 3rd, so get your cherry-picks in by end of day on
March 2nd.

Andrew

On Mon, Mar 18, 2019 at 9:14 AM Etienne Chauchot 
wrote:

> Sounds great, thanks for volunteering to do the release.
>
> Etienne
>
> Le mercredi 13 mars 2019 à 12:08 -0700, Andrew Pilloud a écrit :
>
> Hello Beam community!
>
> Beam 2.12 release branch cut date is March 27th according to the release
> calendar [1]. I would like to volunteer myself to do this release. I intend
> to cut the branch as planned on March 27th and cherrypick fixes if needed.
>
> If you have releasing blocking issues for 2.12 please mark their "Fix
> Version" as 2.12.0. Kenn created a 2.13.0 release in JIRA in case you
> would like to move any non-blocking issues to that version.
>
> Does this sound reasonable?
>
> Andrew
>
> [1]
> https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com=America%2FLos_Angeles
>
>


Re: [PROPOSAL] Preparing for Beam 2.12.0 release

2019-03-28 Thread Andrew Pilloud
Yes, that is what I meant. Sorry about mixing up the month!

On Thu, Mar 28, 2019 at 9:26 AM Robert Burke  wrote:

> I'm going to go out on a limb and assume you mean first RC cut on April
> 3rd, and the Cherry-pick deadline EoD (PST?) April 2nd.
>
> On Thu, 28 Mar 2019 at 09:23, Andrew Pilloud  wrote:
>
>> It seems like there was some confusion around when the branch cut was
>> going to happen. I cut the branch yesterday, but a dozen release blocking
>> fixes went in immediately after. I recut the branch today, one day late, at
>> 1d9daf1
>> 
>> .
>>
>> There are currently 3 release blocking issues
>> . I plan
>> to cut the first RC on March 3rd, so get your cherry-picks in by end of day
>> on March 2nd.
>>
>> Andrew
>>
>> On Mon, Mar 18, 2019 at 9:14 AM Etienne Chauchot 
>> wrote:
>>
>>> Sounds great, thanks for volunteering to do the release.
>>>
>>> Etienne
>>>
>>> Le mercredi 13 mars 2019 à 12:08 -0700, Andrew Pilloud a écrit :
>>>
>>> Hello Beam community!
>>>
>>> Beam 2.12 release branch cut date is March 27th according to the
>>> release calendar [1]. I would like to volunteer myself to do this release.
>>> I intend to cut the branch as planned on March 27th and cherrypick fixes if
>>> needed.
>>>
>>> If you have releasing blocking issues for 2.12 please mark their "Fix
>>> Version" as 2.12.0. Kenn created a 2.13.0 release in JIRA in case you
>>> would like to move any non-blocking issues to that version.
>>>
>>> Does this sound reasonable?
>>>
>>> Andrew
>>>
>>> [1]
>>> https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com=America%2FLos_Angeles
>>>
>>>


Re: Flink 1.7.2 + Beam 2.11 error: The transform beam:transform:create_view:v1 is currently not supported.

2019-03-28 Thread Lukasz Cwik
+dev 

On Thu, Mar 28, 2019 at 9:13 AM Kaymak, Tobias 
wrote:

> Hello,
>
> I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and from
> Beam 2.10 to 2.11 and I am seeing this error when starting my pipelines:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.UnsupportedOperationException: The transform
> beam:transform:create_view:v1 is currently not supported.
>
>
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)
>
>
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at
> org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
> at
> org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)
>
>
> at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)
>
>
> at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
> at
> ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:175)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
>
> I found this open issue while googling
> https://jira.apache.org/jira/browse/BEAM-4301 - but it seems unrelated,
> what makes me wonder is the type of error message I am seeing.
> I tried Flink 1.7.2 with Scala 2.11 + 2.12 without luck.
> I tried deleting all state information of Flink (ha/ and snapshots/), in
> the end I tried downgrading to Beam 2.10. - And that worked.
> Could it be that there is a bug that has been introduced in 2.11?
>
> Best,
> Tobi
>
>
>


Re: Increase Portable SDK Harness share of memory?

2019-03-28 Thread Lukasz Cwik
The intention is that these kinds of hints such as CPU and/or memory should
be embedded in the environment specification that is associated with the
transforms that need resource hints.

The environment spec is woefully ill prepared as it only has a docker URL
right now.

On Thu, Mar 28, 2019 at 8:45 AM Robert Burke  wrote:

> A question came over the beam-go slack that I wasn't able to answer, in
> particular for Dataflow*, is there a way to increase how much of a Portable
> FnAPI worker is dedicated for the SDK side, vs the Runner side?
>
> My assumption is that runners should manage it, and have the Runner
> Harness side be as lightweight as possible, to operate under reasonable
> memory bounds, allowing the user-code more room to spread, since it's
> largely unknown.
>
> I saw there's the Provisioning API
> 
> which to communicates resource limits to the SDK side, but is there a way
> to make the request (probably on job start up) in the other direction?
>
> I imagine it has to do with the container boot code, but I have only vague
> knowledge of how that works at present.
>
> If there's a portable way for it, that's ideal, but I suspect this will be
> require a Dataflow specific answer.
>
> Thanks!
> Robert B
>
> *Dataflow doesn't support the Go SDK, but the Go SDK supports Dataflow.
>


Re: [PROPOSAL] Preparing for Beam 2.12.0 release

2019-03-28 Thread Robert Burke
I'm going to go out on a limb and assume you mean first RC cut on April
3rd, and the Cherry-pick deadline EoD (PST?) April 2nd.

On Thu, 28 Mar 2019 at 09:23, Andrew Pilloud  wrote:

> It seems like there was some confusion around when the branch cut was
> going to happen. I cut the branch yesterday, but a dozen release blocking
> fixes went in immediately after. I recut the branch today, one day late, at
> 1d9daf1
> 
> .
>
> There are currently 3 release blocking issues
> . I plan
> to cut the first RC on March 3rd, so get your cherry-picks in by end of day
> on March 2nd.
>
> Andrew
>
> On Mon, Mar 18, 2019 at 9:14 AM Etienne Chauchot 
> wrote:
>
>> Sounds great, thanks for volunteering to do the release.
>>
>> Etienne
>>
>> Le mercredi 13 mars 2019 à 12:08 -0700, Andrew Pilloud a écrit :
>>
>> Hello Beam community!
>>
>> Beam 2.12 release branch cut date is March 27th according to the release
>> calendar [1]. I would like to volunteer myself to do this release. I intend
>> to cut the branch as planned on March 27th and cherrypick fixes if needed.
>>
>> If you have releasing blocking issues for 2.12 please mark their "Fix
>> Version" as 2.12.0. Kenn created a 2.13.0 release in JIRA in case you
>> would like to move any non-blocking issues to that version.
>>
>> Does this sound reasonable?
>>
>> Andrew
>>
>> [1]
>> https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com=America%2FLos_Angeles
>>
>>


Re: Debugging :beam-sdks-java-io-hadoop-input-format:test

2019-03-28 Thread Alexey Romanenko
Hi Mikhail,

We had a flaky “HIFIOWithEmbeddedCassandraTest” a while ago and it was caused 
by issue with launching of embedded Cassandra cluster. Then it was fixed by 
Etienne Chauchot's PR [1]
Though, I don’t see any similar error messages in your Jenkins job log, so, I’m 
not sure it’s the same issue.

Have you seen this fail only once or several times already?

[1] https://github.com/apache/beam/pull/8000

> On 27 Mar 2019, at 22:24, Mikhail Gryzykhin  wrote:
> 
> Hi everyone,
> 
> I have a pre-commit job that fails on 
> :beam-sdks-java-io-hadoop-input-format:test 
> . 
> Relevant PR. 
> 
> Target doesn't have any explicit log associated with it. Running same target 
> in local doesn't give me much help. It seem to fail somewhere in native 
> runtime.
> 
> Can someone help with tackling this issue?
> 
> Regards,
> Mikhail.
> 
> 



Increase Portable SDK Harness share of memory?

2019-03-28 Thread Robert Burke
A question came over the beam-go slack that I wasn't able to answer, in
particular for Dataflow*, is there a way to increase how much of a Portable
FnAPI worker is dedicated for the SDK side, vs the Runner side?

My assumption is that runners should manage it, and have the Runner Harness
side be as lightweight as possible, to operate under reasonable memory
bounds, allowing the user-code more room to spread, since it's largely
unknown.

I saw there's the Provisioning API

which to communicates resource limits to the SDK side, but is there a way
to make the request (probably on job start up) in the other direction?

I imagine it has to do with the container boot code, but I have only vague
knowledge of how that works at present.

If there's a portable way for it, that's ideal, but I suspect this will be
require a Dataflow specific answer.

Thanks!
Robert B

*Dataflow doesn't support the Go SDK, but the Go SDK supports Dataflow.


Re: Deprecating Avro for fastavro on Python 3

2019-03-28 Thread Ismaël Mejía
Hello,

The problem of switching implementations is the risk of losing
interoperability, and this is more important than performance. Does
fastavro have tests that guarantee that it is fully compatible with
Avro’s Java version? (given that it is the de-facto implementation
used everywhere).

If performance is a more important criteria maybe it is worth to check
at pyavro-rs [1], you can take a look at its performance in the great
talk of last year [2].

I have been involved actively in the Avro community in the last months
and I am now a committer there. Also Dan Kulp who has done multiple
contributions in Beam is now a PMC member too. We are at this point
working hard to get the next release of Avro out, actually the branch
cut of Avro 1.9.0 is happening this week, and we plan to improve the
release cadence. Please understand that the issue with Avro is that it
is a really specific and ‘old‘ project (~10 years) so part of the
active moved to other areas because it is stable, but we are still
there working on it and we are eager to improve it for everyone’s
needs (and of course Beam needs).

I know that Python 3’s Avro implementation is still lacking and could
be improved (views expressed here are clearly valid), but maybe this
is a chance to contribute there too. Remember Apache projects are a
family and we have a history of cross colaboration with other
communities e.g. Flink, Calcite so why not give it a chance to Avro
too.

Regards,
Ismaël

[1] https://github.com/flavray/pyavro-rs
[2] 
https://ep2018.europython.eu/media/conference/slides/how-to-write-rust-instead-of-c-and-get-away-with-it-yes-its-a-python-talk.pdf

On Wed, Mar 27, 2019 at 11:42 PM Chamikara Jayalath
 wrote:
>
> +1 for making use_fastavro the default for Python3. I don't see any 
> significant drawbacks in doing this from Beam's point of view. One concern is 
> whether avro and fastavro can safely co-exist in the same environment so that 
> Beam continues to work for users who already have avro library installed.
>
> Note that there are two use_fastavro flags (confusingly enough).
> (1) for avro file source [1]
> (2) an experiment flag [2] with the same name that makes Dataflow runner use 
> fastavro library for reading/writing intermediate files and for reading Avro 
> files exported by BigQuery.
>
> I can help with the latter.
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/avroio.py#L81
> [2] 
> https://lists.apache.org/thread.html/94bd362a3a041654e6ef9003fb3fa797e25274fdb4766065481a0796@%3Cuser.beam.apache.org%3E
>
> Thanks,
> Cham
>
> On Wed, Mar 27, 2019 at 3:27 PM Valentyn Tymofieiev  
> wrote:
>>
>> Thanks, Robbe and Frederik, for raising this.
>>
>> Over the course of making Beam Python 3 compatible this is at least the 
>> second time [1] we have to deal with an error in avro-python3 package. The 
>> release cadence of Apache Avro (1 release a year)
>> is concerning to me [2]. Even if we have a new release with Python 3 fixes 
>> soon, as Beam users start use Beam more actively on Python 3, we may 
>> encounter more issues in avro-python3. If this happens, Beam will have to 
>> monkey-patch its way around the avro-python3 issues, because waiting for 
>> next Avro release may not be practical.
>>
>> So, I agree that it is be a good time to start transitioning off of 
>> avro/avro-python3 dependency, given that fastavro is known to be a faster 
>> alternative [3], and is released monthly[4]
>>
>> There are couple of ways to make this transition depending on how careful we 
>> want to be. We should:
>>
>> 1. Remove the dependency on avro in the current codepath whenever fastavro 
>> is used, as you propose.
>> 2. Remove Beam dependency on avro-python3 now,  OR,  if we want to be safer, 
>>  set use_fastavro=True a default option on Python 3, but keep the dependency 
>> on avro-python3, and keep that codepath, even though it may not work right 
>> now on Py3, but might work after next Avro release.
>> 3. set use_fastavro=True a default option on Python 2.
>> 4. Remove Beam dependency on avro and avro-python3 after several releases.
>>
>> Adding +Chamikara Jayalath and +Udi Meiri who have been working on Beam IOs 
>> may have some thoughts here. Do you think that it is safe to make 
>> use_fastavro=True a default option for both Py2 and Py3 now? If we make 
>> use_fastavro a default option on Py3, do you think there is a benefit to 
>> still keep the Avro codepath on Py3, or we can remove it?
>>
>> Thanks,
>> Valentyn
>>
>> [1] https://github.com/apache/avro/pull/436
>> [2] https://avro.apache.org/releases.html
>> [3] 
>> https://medium.com/@abrarsheikh/benchmarking-avro-and-fastavro-using-pytest-benchmark-tox-and-matplotlib-bd7a83964453
>> [4] https://pypi.org/project/fastavro/#history
>>
>> On Wed, Mar 27, 2019 at 10:49 AM Robbe Sneyders  
>> wrote:
>>>
>>> Hi all,
>>>
>>> We're looking at fixing avroio on Python 3, which still fails due to a 
>>> non-picklable schema class in Avro [1]. This is 

Re: NullPointerException - Session windows with Lateness in FlinkRunner

2019-03-28 Thread Maximilian Michels

Hi Rahul,

Thanks for providing the detailed report. This looks like a bug rather 
than a limitation of the Flink Runner. We have integration tests for 
session windows with the Flink Runner but they seemed to have missed 
this issue.


Let me investigate and get back to you. Tracking issue: 
https://jira.apache.org/jira/browse/BEAM-6929


Thanks,
Max

On 28.03.19 03:01, rahul patwari wrote:

+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, > wrote:


Hi,
I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink
Cluster - 1.7.2.

I have this flow in my pipeline:
KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes,
default trigger)  -->  BeamSQL(GroupBy query)  --> 
Window.remerge()  -->  Enrichment  -->  KafkaSink


I am generating data in such a way that the first two records belong
to two different sessions. And, generating the third record before
the first session expires with the timestamp for the third record in
such a way that the two sessions will be merged to become a single
session.

For Example, These are the sample input and output obtained when I
ran the same pipeline in DirectRunner.

Sample Input:
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}

Sample Output:

{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}

{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}

{"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}

Where "NumberOfRecords" is the count, "WST" is the Avro field Name
which indicates the window start time for the session window.
Similarly "WET" indicates the window End time of the session window.
I am getting "WST" and "WET" after remerging and applying
ParDo(Enrichment stage of the pipeline).

The program ran successfully in DirectRunner. But, in FlinkRunner, I
am getting this exception when the third record arrives:

2019-03-27 15:31:00,442 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        -

Source: DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) ->
Flat Map ->

DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
-> (Window.Into()/Window.Assign.out ->

DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
->

DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,

DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->

DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
(1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to
RUNNING.
2019-03-27 15:33:25,427 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        -


DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
->

DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
-> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc)
-> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->

DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
->

DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->

DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
(1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to
FAILED.
org.apache.beam.sdk.util.UserCodeException:
java.lang.NullPointerException
         at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
         at