Re: NullPointerException - Session windows with Lateness in FlinkRunner

2019-03-28 Thread Maximilian Michels

Hi Rahul,

Thanks for providing the detailed report. This looks like a bug rather 
than a limitation of the Flink Runner. We have integration tests for 
session windows with the Flink Runner but they seemed to have missed 
this issue.


Let me investigate and get back to you. Tracking issue: 
https://jira.apache.org/jira/browse/BEAM-6929


Thanks,
Max

On 28.03.19 03:01, rahul patwari wrote:

+dev

On Wed 27 Mar, 2019, 9:47 PM rahul patwari, > wrote:


Hi,
I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink
Cluster - 1.7.2.

I have this flow in my pipeline:
KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with
gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes,
default trigger)  -->  BeamSQL(GroupBy query)  --> 
Window.remerge()  -->  Enrichment  -->  KafkaSink


I am generating data in such a way that the first two records belong
to two different sessions. And, generating the third record before
the first session expires with the timestamp for the third record in
such a way that the two sessions will be merged to become a single
session.

For Example, These are the sample input and output obtained when I
ran the same pipeline in DirectRunner.

Sample Input:
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}

Sample Output:

{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}

{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}

{"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}

Where "NumberOfRecords" is the count, "WST" is the Avro field Name
which indicates the window start time for the session window.
Similarly "WET" indicates the window End time of the session window.
I am getting "WST" and "WET" after remerging and applying
ParDo(Enrichment stage of the pipeline).

The program ran successfully in DirectRunner. But, in FlinkRunner, I
am getting this exception when the third record arrives:

2019-03-27 15:31:00,442 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        -

Source: DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) ->
Flat Map ->

DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
-> (Window.Into()/Window.Assign.out ->

DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
->

DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,

DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->

DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
(1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to
RUNNING.
2019-03-27 15:33:25,427 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        -


DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
->

DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
-> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc)
-> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->

DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
->

DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->

DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
(1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to
FAILED.
org.apache.beam.sdk.util.UserCodeException:
java.lang.NullPointerException
         at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
         at

org.apache.beam.runners.core.GroupAlsoByWindow

Re: Flink 1.7.2 + Beam 2.11 error: The transform beam:transform:create_view:v1 is currently not supported.

2019-03-29 Thread Maximilian Michels

Hi Tobias,

Thank for reporting. Can confirm, this is a regression with the 
detection of the execution mode. Everything should work fine if you set 
the "streaming" flag to true. Will be fixed for the 2.12.0 release.


Thanks,
Max

On 28.03.19 17:28, Lukasz Cwik wrote:

+dev 

On Thu, Mar 28, 2019 at 9:13 AM Kaymak, Tobias > wrote:


Hello,

I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and
from Beam 2.10 to 2.11 and I am seeing this error when starting my
pipelines:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
         at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
         at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
         at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
         at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
         at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
         at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
         at

org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
         at

org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
         at

org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
         at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.UnsupportedOperationException: The transform
beam:transform:create_view:v1 is currently not supported.
         at

org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)

         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
         at

org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
         at

org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
         at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
         at

org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
         at

org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)

         at

org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)

         at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
         at
ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:175)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
         ... 9 more

I found this open issue while googling
https://jira.apache.org/jira/browse/BEAM-4301 - but it seems
unrelated, what makes me wonder is the type of error message I am
seeing.
I tried Flink 1.7.2 with Scala 2.11 + 2.12 without luck.
I tried deleting all state information of Flink (ha/ and
snapshots/), in the end I tried downgrading to Beam 2.10. - And that
worked.
Could it be that there is a bug that has been introduced in 2.11?

Best,
Tobi




Re: Flink 1.7.2 + Beam 2.11 error: The transform beam:transform:create_view:v1 is currently not supported.

2019-03-29 Thread Maximilian Michels
Great. Here is the JIRA issue including a PR which fixes the problem and 
adds additional tests to prevent this in the future: 
https://issues.apache.org/jira/browse/BEAM-6937


Thanks,
Max

On 29.03.19 15:43, Kaymak, Tobias wrote:

Can confirm that this is the issue, starting with streaming=True fixes it.

On Fri, Mar 29, 2019 at 11:53 AM Maximilian Michels <mailto:m...@apache.org>> wrote:


Hi Tobias,

Thank for reporting. Can confirm, this is a regression with the
detection of the execution mode. Everything should work fine if you set
the "streaming" flag to true. Will be fixed for the 2.12.0 release.

Thanks,
Max

On 28.03.19 17:28, Lukasz Cwik wrote:
 > +dev <mailto:dev@beam.apache.org <mailto:dev@beam.apache.org>>
 >
 > On Thu, Mar 28, 2019 at 9:13 AM Kaymak, Tobias
mailto:tobias.kay...@ricardo.ch>
 > <mailto:tobias.kay...@ricardo.ch
<mailto:tobias.kay...@ricardo.ch>>> wrote:
 >
 >     Hello,
 >
 >     I just upgraded to Flink 1.7.2 from 1.6.2 with my dev cluster and
 >     from Beam 2.10 to 2.11 and I am seeing this error when
starting my
 >     pipelines:
 >
 >     org.apache.flink.client.program.ProgramInvocationException:
The main
 >     method caused an error.
 >              at
 >   
  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)

 >              at
 >   
  org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)

 >              at
 >   
  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)

 >              at
 >   
  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)

 >              at
 >   
  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)

 >              at
 >     org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
 >              at
 >   
  org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

 >              at
 >   
  org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

 >              at
 >   
  org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

 >              at
 >   
  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

 >     Caused by: java.lang.UnsupportedOperationException: The transform
 >     beam:transform:create_view:v1 is currently not supported.
 >              at
 >   
  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:113)

 >
 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)

 >              at
 >   
  org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)

 >              at
 >   
  org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)

 >              at
 >   
  org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)

 >              at
 >   
  org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:68)

 >
 >              at
 >   
  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:111)

 >
 >              at
 >   
  org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)

 >              at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
 >              at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
 >              at
 >     ch.ricardo.di.beam.KafkaToBigQuery

Re: Quieten javadoc generation

2019-04-03 Thread Maximilian Michels

+1

On 02.04.19 22:56, Mikhail Gryzykhin wrote:
+1 to suppress warnings globally. If we care about an issue, it should 
be error.


On Tue, Apr 2, 2019 at 5:38 AM Alexey Romanenko 
mailto:aromanenko@gmail.com>> wrote:


+1 to suppress such warnings globally. IMO, usually, meaningful
Javadoc description is quite enough to understand what this method does.


On 1 Apr 2019, at 18:21, Kenneth Knowles mailto:k...@apache.org>> wrote:

Personally, I would like to suppress the warnings globally. I
think requiring javadoc everywhere is already enough to remind
someone to write something meaningful. And I think @param rarely
adds anything beyond the function signature and @return rarely
adds anything beyond the description.

Kenn

On Mon, Apr 1, 2019 at 6:53 AM Michael Luckey mailto:adude3...@gmail.com>> wrote:

Hi,

currently our console output gets cluttered by thousands of
Javadoc warnings [1]. Most of them are warnings caused by
missinlng @return or @param tags  [2].

So currently, this signal is completely ignored, and even
worse, makes it difficult to parse through the log.

As I could not find a previous discussion on the list on how
to handle param/return on java docs, I felt the need to ask
here first, how we would like to improve this situation.

Some options
1. fix those warnings
2. do not insist on those tags being present and disable
doclint warnings (probably not doable on tag granularity).
This is already done on doc aggregation task [3]

Thoughts?


[1]
https://builds.apache.org/job/beam_PreCommit_Java_Cron/1131/console
[2]
https://builds.apache.org/job/beam_PreCommit_Java_Cron/1131/java/
[3]

https://github.com/apache/beam/blob/master/sdks/java/javadoc/build.gradle#L77-L78





Re: JDK11 support?

2019-04-09 Thread Maximilian Michels

Hi Yi,

That's a great question. Beam is still on Java 8 at the moment. There is 
a JIRA issue for making Beam compatible with Java 11: 
https://issues.apache.org/jira/browse/BEAM-2530


As you can read in the issue, the upcoming Beam 2.12.0 has experimental 
support for Java 11. That said, there is still code that needs a major 
overhaul, e.g. automatic staging of pipeline jars 
(https://issues.apache.org/jira/browse/BEAM-5495).


I hope we can achieve proper Java 11 compatibility in the next releases. 
Perhaps somebody else can give further insight into how much work is left.


Thanks,
Max

On 09.04.19 22:07, Yi Pan wrote:

Hi, everyone,

I checked out the latest Beam code and seems that the document still 
says "JDK8". Is there any plan to officially support JDK11?


Thanks a lot!

-Yi


Re: ParDo Execution Time stat is always 0

2019-04-10 Thread Maximilian Michels

Hi @all,

From a quick debugging session, I conclude that the wiring is in place 
for the Flink Runner. There is a ProgressReporter that reports 
MonitoringInfos to Flink, in a similar fashion as the "legacy" Runner.


The bundle duration metrics are 0, but the element count gets reported 
correctly. It appears to be an issue of the Python/Java harness because 
"ProcessBundleProgressResponse" contains only 0 values for the bundle 
duration.


Thanks,
Max

On 04.04.19 19:54, Mikhail Gryzykhin wrote:

Hi everyone,

Quick summary on python and Dataflow Runner:
Python SDK already reports:
- MSec
- User metrics (int64 and distribution)
- PCollection Element Count
- Work on MeanByteCount for pcollection is ongoing here 
.


Dataflow Runner:
- all metrics listed above are passed through to Dataflow.

Ryan can give more information on Flink Runner. I also see Maximilian on 
some of relevant PRs, so he might comment on this as well.


Regards,
Mikhail.


On Thu, Apr 4, 2019 at 10:43 AM Pablo Estrada > wrote:


Hello guys!
Alex, Mikhail and Ryan are working on support for metrics in the
portability framework. The support on the SDK is pretty advanced
AFAIK*, and the next step is to get the metrics back into the
runner. Lukazs and myself are working on a project that depends on
this too, so I'm adding everyone so we can get an idea of what's
missing.

I believe:
- User metrics are fully wired up in the SDK
- State sampler (timing) metrics are wired up as well (is that
right, +Alex Amato ?)
- Work is ongoing to send the updates back to Flink.
- What is the plan for making metrics queriable from Flink? +Ryan
Williams 

Thanks!
-P.



On Wed, Apr 3, 2019 at 12:02 PM Thomas Weise mailto:t...@apache.org>> wrote:

I believe this is where the metrics are supplied:

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py

git grep process_bundle_msecs   yields results for dataflow
worker only

There isn't any test coverage for the Flink runner:


https://github.com/apache/beam/blob/d38645ae8758d834c3e819b715a66dd82c78f6d4/sdks/python/apache_beam/runners/portability/flink_runner_test.py#L181



On Wed, Apr 3, 2019 at 10:45 AM Akshay Balwally
mailto:abalwa...@lyft.com>> wrote:

Should have added- I'm using Python sdk, Flink runner

On Wed, Apr 3, 2019 at 10:32 AM Akshay Balwally
mailto:abalwa...@lyft.com>> wrote:

Hi,
I'm hoping to get metrics on the amount of time spent on
each operator, so it seams like the stat


{organization_specific_prefix}.operator.beam-metric-pardo_execution_time-process_bundle_msecs-v1.gauge.mean

would be pretty helpful. But in practice, this stat
always shows 0, which I interpret as 0 milliseconds
spent per bundle, which can't be correct (other stats
show that the operators are running, and timers within
the operators show more reasonable times). Is this a
known bug?


-- 
*Akshay Balwally*

Software Engineer
937.271.6469 
Lyft 



-- 
*Akshay Balwally*

Software Engineer
937.271.6469 
Lyft 



[docs] Python State & Timers

2019-04-11 Thread Maximilian Michels

Hi everyone,

The Python SDK still lacks documentation on state and timers.

As a first step, what do you think about updating these two blog posts 
with the corresponding Python code?


https://beam.apache.org/blog/2017/02/13/stateful-processing.html
https://beam.apache.org/blog/2017/08/28/timely-processing.html

Thanks,
Max


Re: [ANNOUNCE] New committer announcement: Boyuan Zhang

2019-04-11 Thread Maximilian Michels

Great work! Congrats.

On 11.04.19 13:41, Robert Bradshaw wrote:

Congratulations!

On Thu, Apr 11, 2019 at 12:29 PM Michael Luckey  wrote:


Congrats and welcome, Boyuan

On Thu, Apr 11, 2019 at 12:27 PM Tim Robertson  
wrote:


Many congratulations Boyuan!

On Thu, Apr 11, 2019 at 10:50 AM Łukasz Gajowy  wrote:


Congrats Boyuan! :)

śr., 10 kwi 2019 o 23:49 Chamikara Jayalath  napisał(a):


Congrats Boyuan!

On Wed, Apr 10, 2019 at 11:14 AM Yifan Zou  wrote:


Congratulations Boyuan!

On Wed, Apr 10, 2019 at 10:49 AM Daniel Oliveira  wrote:


Congrats Boyuan!

On Wed, Apr 10, 2019 at 10:20 AM Rui Wang  wrote:


So well deserved!

-Rui

On Wed, Apr 10, 2019 at 10:12 AM Pablo Estrada  wrote:


Well deserved : ) congrats Boyuan!

On Wed, Apr 10, 2019 at 10:08 AM Aizhamal Nurmamat kyzy  
wrote:


Congratulations Boyuan!

On Wed, Apr 10, 2019 at 9:52 AM Ruoyun Huang  wrote:


Thanks for your contributions and congratulations Boyuan!

On Wed, Apr 10, 2019 at 9:00 AM Kenneth Knowles  wrote:


Hi all,

Please join me and the rest of the Beam PMC in welcoming a new committer: 
Boyuan Zhang.

Boyuan has been contributing to Beam since early 2018. She has proposed 100+ 
pull requests across a wide range of topics: bug fixes, to integration tests, 
build improvements, metrics features, release automation. Two big picture 
things to highlight are building/releasing Beam Python wheels and managing the 
donation of the Beam Dataflow Java Worker, including help with I.P. clearance.

In consideration of Boyuan's contributions, the Beam PMC trusts Boyuan with the 
responsibilities of a Beam committer [1].

Thank you, Boyuan, for your contributions.

Kenn

[1] 
https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer




--

Ruoyun  Huang



Re: [docs] Python State & Timers

2019-04-12 Thread Maximilian Michels

It would probably be pretty easy to add the corresponding code snippets to the 
docs as well.


It's probably a bit more work because there is no section dedicated to 
state/timer yet in the documentation. Tracked here: 
https://jira.apache.org/jira/browse/BEAM-2472



I've been going over this topic a bit. I'll add the snippets next week, if 
that's fine by y'all.


That would be great. The blog posts are a great way to get started with 
state/timers.


Thanks,
Max

On 11.04.19 20:21, Pablo Estrada wrote:
I've been going over this topic a bit. I'll add the snippets next week, 
if that's fine by y'all.

Best
-P.

On Thu, Apr 11, 2019 at 5:27 AM Robert Bradshaw <mailto:rober...@google.com>> wrote:


That's a great idea! It would probably be pretty easy to add the
corresponding code snippets to the docs as well.

On Thu, Apr 11, 2019 at 2:00 PM Maximilian Michels mailto:m...@apache.org>> wrote:
 >
 > Hi everyone,
 >
 > The Python SDK still lacks documentation on state and timers.
 >
 > As a first step, what do you think about updating these two blog
posts
 > with the corresponding Python code?
 >
 > https://beam.apache.org/blog/2017/02/13/stateful-processing.html
 > https://beam.apache.org/blog/2017/08/28/timely-processing.html
 >
 > Thanks,
 > Max



Re: Contributor Request

2019-04-15 Thread Maximilian Michels

Hi Thinh,

Sounds great. Would be interesting to hear more about King's use case.

I've added you as a JIRA contributor.

Cheers,
Max

On 15.04.19 16:06, Thinh Ha wrote:

Hi there,

My name is Thinh. I am a strategic cloud engineer in the Google Cloud 
professional services team. I specialise in Data and currently working 
with a customer who is a heavy user of Beam/Dataflow (King).


I wanted to have a go at implementing a ticket that they requested as a 
contributor. I'd like to be added as a Jira contributor so that I can 
assign issues to myself.


My ASF Jira username is thinhha.

Many Thanks,
--



Thinh Ha

thin...@google.com 

Strategic Cloud Engineer

+44 (0)20 3820 8009



Re: Python SDK timestamp precision

2019-04-17 Thread Maximilian Michels

Hi,

Thanks for taking care of this issue in the Python SDK, Thomas!

It would be nice to have a uniform precision for timestamps but, as Kenn 
pointed out, timestamps are extracted from systems that have different 
precision.


To add to the list: Flink - milliseconds

After all, it doesn't matter as long as there is sufficient precision 
and conversions are done correctly.


I think we could improve the situation by at least adding a 
"milliseconds" constructor to the Python SDK's Timestamp.


Cheers,
Max

On 17.04.19 04:13, Kenneth Knowles wrote:
I am not so sure this is a good idea. Here are some systems and their 
precision:


Arrow - microseconds
BigQuery - microseconds
New Java instant - nanoseconds
Firestore - microseconds
Protobuf - nanoseconds
Dataflow backend - microseconds
Postgresql - microseconds
Pubsub publish time - nanoseconds
MSSQL datetime2 - 100 nanoseconds (original datetime about 3 millis)
Cassandra - milliseconds

IMO it is important to be able to treat any of these as a Beam 
timestamp, even though they aren't all streaming. Who knows when we 
might be ingesting a streamed changelog, or using them for reprocessing 
an archived stream. I think for this purpose we either should 
standardize on nanoseconds or make the runner's resolution independent 
of the data representation.


I've had some offline conversations about this. I think we can have 
higher-than-runner precision in the user data, and allow WindowFns and 
DoFns to operate on this higher-than-runner precision data, and still 
have consistent watermark treatment. Watermarks are just bounds, after all.


Kenn

On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise > wrote:


The Python SDK currently uses timestamps in microsecond resolution
while Java SDK, as most would probably expect, uses milliseconds.

This causes a few difficulties with portability (Python coders need
to convert to millis for WindowedValue and Timers, which is related
to a bug I'm looking into:

https://issues.apache.org/jira/browse/BEAM-7035

As Luke pointed out, the issue was previously discussed:

https://issues.apache.org/jira/browse/BEAM-1524

I'm not privy to the reasons why we decided to go with micros in
first place, but would it be too big of a change or impractical for
other reasons to switch Python SDK to millis before it gets more users?

Thanks,
Thomas



Artifact staging in cross-language pipelines

2019-04-18 Thread Maximilian Michels

Hi everyone,

We have previously merged support for configuring transforms across 
languages. Please see Cham's summary on the discussion [1]. There is 
also a design document [2].


Subsequently, we've added wrappers for cross-language transforms to the 
Python SDK, i.e. GenerateSequence, ReadFromKafka, and there is a pending 
PR [1] for WriteToKafka. All of them utilize Java transforms via 
cross-language configuration.


That is all pretty exciting :)

We still have some issues to solve, one being how to stage artifact from 
a foreign environment. When we run external transforms which are part of 
Beam's core (e.g. GenerateSequence), we have them available in the SDK 
Harness. However, when they are not (e.g. KafkaIO) we need to stage the 
necessary files.


For my PR [3] I've naively added ":beam-sdks-java-io-kafka" to the SDK 
Harness which caused dependency problems [4]. Those could be resolved 
but the bigger question is how to stage artifacts for external 
transforms programmatically?


Heejong has solved this by adding a "--jar_package" option to the Python 
SDK to stage Java files [5]. I think that is a better solution than 
adding required Jars to the SDK Harness directly, but it is not very 
convenient for users.


I've discussed this today with Thomas and we both figured that the 
expansion service needs to provide a list of required Jars with the 
ExpansionResponse it provides. It's not entirely clear, how we determine 
which artifacts are necessary for an external transform. We could just 
dump the entire classpath like we do in PipelineResources for Java 
pipelines. This provides many unneeded classes but would work.


Do you think it makes sense for the expansion service to provide the 
artifacts? Perhaps you have a better idea how to resolve the staging 
problem in cross-language pipelines?


Thanks,
Max

[1] 
https://lists.apache.org/thread.html/b99ba8527422e31ec7bb7ad9dc3a6583551ea392ebdc5527b5fb4a67@%3Cdev.beam.apache.org%3E


[2] https://s.apache.org/beam-cross-language-io

[3] https://github.com/apache/beam/pull/8322#discussion_r276336748

[4] Dependency graph for beam-runners-direct-java:

beam-runners-direct-java -> sdks-java-harness -> beam-sdks-java-io-kafka 
-> beam-runners-direct-java ... the cycle continues


Beam-runners-direct-java depends on sdks-java-harness due
to the infamous Universal Local Runner. Beam-sdks-java-io-kafka depends 
on beam-runners-direct-java for running tests.


[5] https://github.com/apache/beam/pull/8340


Re: Artifact staging in cross-language pipelines

2019-04-19 Thread Maximilian Michels
  by many Beam users ? I think such a
(possibly long running) service will have to
maintain a repository of transforms and
should have mechanism for registering new
transforms and discovering already
registered transforms etc. I think there's
more design work needed to make transform
expansion service support such use-cases.
Currently, I think allowing pipeline author
to provide the jars when starting the
expansion service and when executing the
pipeline will be adequate.

Regarding the entity that will perform the
staging, I like Luke's idea of allowing
expansion service to do the staging (of jars
provided by the user). Notion of artifacts
and how they are extracted/represented is
SDK dependent. So if the pipeline SDK tries
to do this we have to add n x (n -1)
configurations (for n SDKs).

- Cham

On Thu, Apr 18, 2019 at 11:45 AM Lukasz Cwik
mailto:lc...@google.com>>
wrote:

We can expose the artifact staging
endpoint and artifact token to allow the
expansion service to upload any
resources its environment may need. For
example, the expansion service for the
Beam Java SDK would be able to upload jars.

In the "docker" environment, the Apache
Beam Java SDK harness container would
fetch the relevant artifacts for itself
and be able to execute the pipeline.
(Note that a docker environment could
skip all this artifact staging if the
docker environment contained all
necessary artifacts).

For the existing "external" environment,
it should already come with all the
resources prepackaged wherever
"external" points to. The "process"
based environment could choose to use
the artifact staging service to fetch
those resources associated with its
process or it could follow the same
pattern that "external" would do and
already contain all the prepackaged
resources. Note that both "external" and
"process" will require the instance of
the expansion service to be specialized
for those environments which is why the
default should for the expansion service
to be the "docker" environment.

Note that a major reason for going with
docker containers as the environment
that all runners should support is that
containers provides a solution for this
exact issue. Both the "process" and
"external" environments are explicitly
limiting and expanding their
capabilities will quickly have us
building something like a docker
container because we'll quickly find
ourselves solving the same problems that
    docker containers provide (resources,
file layout, permissions, ...)




On Thu, Apr 18, 2019 at 11:21 AM
Maximilian Michels mailto:m...@apache.org>> wrote:

Hi everyone,

We have previously merged support
for configuring transforms across
   

Re: [VOTE] Release 2.12.0, release candidate #4

2019-04-22 Thread Maximilian Michels

+1 (binding)

Found a minor bug while testing, but not a blocker: 
https://jira.apache.org/jira/browse/BEAM-7128


Thanks,
Max

On 20.04.19 23:02, Pablo Estrada wrote:

+1
Ran SQL postcommit, and Dataflow Portability Java validatesrunner tests.

-P.

On Wed, Apr 17, 2019 at 1:38 AM Jean-Baptiste Onofré > wrote:


+1 (binding)

Quickly checked with beam-samples.

Regards
JB

On 16/04/2019 00:50, Andrew Pilloud wrote:
 > Hi everyone,
 >
 > Please review and vote on the release candidate #4 for the version
 > 2.12.0, as follows:
 >
 > [ ] +1, Approve the release
 > [ ] -1, Do not approve the release (please provide specific comments)
 >
 > The complete staging area is available for your review, which
includes:
 > * JIRA release notes [1],
 > * the official Apache source release to be deployed to
dist.apache.org 
 >  [2], which is signed with the key with
 > fingerprint 9E7CEC0661EFD610B632C610AE8FE17F9F8AE3D4 [3],
 > * all artifacts to be deployed to the Maven Central Repository [4],
 > * source code tag "v2.12.0-RC4" [5],
 > * website pull request listing the release [6], publishing the API
 > reference manual [7], and the blog post [8].
 > * Java artifacts were built with Gradle/5.2.1 and OpenJDK/Oracle JDK
 > 1.8.0_181.
 > * Python artifacts are deployed along with the source release to the
 > dist.apache.org  
[2].
 > * Validation sheet with a tab for 2.12.0 release to help with
validation
 > [9].
 >
 > The vote will be open for at least 72 hours. It is adopted by
majority
 > approval, with at least 3 PMC affirmative votes.
 >
 > Thanks,
 > Andrew
 >
 > 1]

https://jira.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12344944
 > [2] https://dist.apache.org/repos/dist/dev/beam/2.12.0/
 > [3] https://dist.apache.org/repos/dist/release/beam/KEYS
 > [4]
https://repository.apache.org/content/repositories/orgapachebeam-1068/
 > [5] https://github.com/apache/beam/tree/v2.12.0-RC4
 > [6] https://github.com/apache/beam/pull/8215
 > [7] https://github.com/apache/beam-site/pull/588
 > [8] https://github.com/apache/beam/pull/8314
 > [9]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1007316984

-- 
Jean-Baptiste Onofré

jbono...@apache.org 
http://blog.nanthrax.net
Talend - http://www.talend.com



Re: [ANNOUNCE] New committer announcement: Yifan Zou

2019-04-22 Thread Maximilian Michels

Congrats! Great work.

-Max

On 22.04.19 19:00, Rui Wang wrote:

Congratulations! Thanks for your contribution!!

-Rui

On Mon, Apr 22, 2019 at 9:57 AM Ruoyun Huang > wrote:


Congratulations, Yifan!

On Mon, Apr 22, 2019 at 9:48 AM Boyuan Zhang mailto:boyu...@google.com>> wrote:

Congratulations, Yifan~

On Mon, Apr 22, 2019 at 9:29 AM Connell O'Callaghan
mailto:conne...@google.com>> wrote:

Well done Yifan!!!

Thank you for sharing Kenn!!!

On Mon, Apr 22, 2019 at 9:00 AM Ahmet Altay
mailto:al...@google.com>> wrote:

Congratulations, Yifan!

On Mon, Apr 22, 2019 at 8:46 AM Tim Robertson
mailto:timrobertson...@gmail.com>> wrote:

Congratulations Yifan!

On Mon, Apr 22, 2019 at 5:39 PM Cyrus Maden
mailto:cma...@google.com>> wrote:

Congratulations Yifan!!

On Mon, Apr 22, 2019 at 11:26 AM Kenneth Knowles
mailto:k...@apache.org>> wrote:

Hi all,

Please join me and the rest of the Beam PMC
in welcoming a new committer: Yifan Zou.

Yifan has been contributing to Beam since
early 2018. He has proposed 70+ pull
requests, adding dependency checking and
improving test infrastructure. But something
the numbers cannot show adequately is the
huge effort Yifan has put into working with
infra and keeping our Jenkins executors healthy.

In consideration of Yian's contributions,
the Beam PMC trusts Yifan with the
responsibilities of a Beamcommitter[1].

Thank you, Yifan, for your contributions.

Kenn

[1]

https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer





-- 


Ruoyun  Huang



Re: Artifact staging in cross-language pipelines

2019-04-22 Thread Maximilian Michels
Thanks for the summary Cham. All makes sense. I agree that we want to 
keep the option to manually specify artifacts.



There are few unanswered questions though.
(1) In what form will a transform author specify dependencies ? For example, 
URL to a Maven repo, URL to a local file, blob ?


Going forward, we probably want to support multiple ways. For now, we 
could stick with a URL-based approach with support for different file 
systems. In the future a list of packages to retrieve from Maven/PyPi 
would be useful.



(2) How will dependencies be included in the expansion response proto ? String 
(URL), bytes (blob) ?


I'd go for a list of Protobuf strings first but the format would have to 
evolve for other dependency types.


(3) How will we manage/share transitive dependencies required at runtime ? 


I'd say transitive dependencies have to be included in the list. In case 
of fat jars, they are reduced to a single jar.



(4) How will dependencies be staged for various runner/SDK combinations ? (for 
example, portable runner/Flink, Dataflow runner)


Staging should be no different than it is now, i.e. go through Beam's 
artifact staging service. As long as the protocol is stable, there could 
also be different implementations.


-Max

On 20.04.19 03:08, Chamikara Jayalath wrote:

OK, sounds like this is a good path forward then.

* When starting up the expansion service, user (that starts up the 
service) provide dependencies necessary to expand transforms. We will 
later add support for adding new transforms to an already running 
expansion service.
* As a part of transform configuration, transform author have the option 
of providing a list of dependencies that will be needed to run the 
transform.
* These dependencies will be send back to the pipeline SDK as a part of 
expansion response and pipeline SDK will stage these resources.
* Pipeline author have the option of specifying the dependencies using a 
pipeline option. (for example, https://github.com/apache/beam/pull/8340)


I think last option is important to (1) make existing transform easily 
available for cross-language usage without additional configurations (2) 
allow pipeline authors to override dependency versions specified by in 
the transform configuration (for example, to apply security patches) 
without updating the expansion service.


There are few unanswered questions though.
(1) In what form will a transform author specify dependencies ? For 
example, URL to a Maven repo, URL to a local file, blob ?
(2) How will dependencies be included in the expansion response proto ? 
String (URL), bytes (blob) ?

(3) How will we manage/share transitive dependencies required at runtime ?
(4) How will dependencies be staged for various runner/SDK combinations 
? (for example, portable runner/Flink, Dataflow runner)


Thanks,
Cham

On Fri, Apr 19, 2019 at 4:49 AM Maximilian Michels <mailto:m...@apache.org>> wrote:


Thank you for your replies.

I did not suggest that the Expansion Service does the staging, but it
would return the required resources (e.g. jars) for the external
transform's runtime environment. The client then has to take care of
staging the resources.

The Expansion Service itself also needs resources to do the
expansion. I
assumed those to be provided when starting the expansion service. I
consider it less important but we could also provide a way to add new
transforms to the Expansion Service after startup.

Good point on Docker vs externally provided environments. For the PR
[1]
it will suffice then to add Kafka to the container dependencies. The
"--jar_package" pipeline option is ok for now but I'd like to see work
towards staging resources for external transforms via information
returned by the Expansion Service. That avoids users having to take
care
of including the correct jars in their pipeline options.

These issues are related and we could discuss them in separate threads:

* Auto-discovery of Expansion Service and its external transforms
* Credentials required during expansion / runtime

Thanks,
Max

[1] ttps://github.com/apache/beam/pull/8322
<http://github.com/apache/beam/pull/8322>

On 19.04.19 07:35, Thomas Weise wrote:
 > Good discussion :)
 >
 > Initially the expansion service was considered a user
responsibility,
 > but I think that isn't necessarily the case. I can also see the
 > expansion service provided as part of the infrastructure and the
user
 > not wanting to deal with it at all. For example, users may want
to write
 > Python transforms and use external IOs, without being concerned how
 > these IOs are provided. Under such scenario it would be good if:
 >
 > * Expansion service(s) can be auto-discovered via the job service
endpoint
 &

Re: Hazelcast Jet Runner

2019-04-22 Thread Maximilian Michels

Hi Jozsef,

If the Runner support the complete set of ValidatesRunner tests and the 
Nexmark suite, it is already in a very good state. Like Kenn already 
suggested, we can definitely add it to the capability matrix then.


Thanks,
Max

On 19.04.19 22:52, Kenneth Knowles wrote:
The ValidatesRunner tests are the best source we have for knowing the 
capabilities of a runner. Are there instructions for running the tests?


Assuming we can check it out, then just open a PR to the website with 
the current capabilities and caveats. Since it is a big deal and could 
use lots of eyes, I would share the PR link on this thread.


Kenn

On Thu, Apr 18, 2019 at 11:53 AM Jozsef Bartok > wrote:


Hi. We at Hazelcast Jet have been working for a while now to
implement a Java Beam Runner (non-portable) based on Hazelcast Jet
(https://jet.hazelcast.org/). The process is still ongoing
(https://github.com/hazelcast/hazelcast-jet-beam-runner), but we are
aiming for a fully functional, reliable Runner which can proudly
join the Capability Matrix. For that purpose I would like to ask
what’s your process of validating runners? We are already running
the @ValidatesRunner tests and the Nexmark test suite, but beyond
that what other steps do we need to take to get our Runner to the
level it needs to be at?



Re: Go SDK status

2019-04-22 Thread Maximilian Michels
Nice summary, Robert! I really like the transparency on the state of the 
Go SDK and how it's being used.


It would be great to see the streaming mode improve because only then we 
have a full-blown SDK. It looks like we will need a few more resources 
on the SDK to bring it up to par with Python.


I agree that cross-language transforms would be the most sensible path 
to solving the IO problem. The state of the Python SDK does not differ 
much in this regard because it also suffers from a lack of IO. I think 
you have seen the recent discussions about how to configure 
cross-language transforms. For the Python side we have Java's 
GenerateSequence and KafkaIO working with the portable Flink Runner.


Unfortunately, I'm not a Gopher yet but I'd be happy to exchange ideas 
or go into more detail about the cross-language capabilities.


Cheers,
Max

On 18.04.19 15:13, Thomas Weise wrote:

Hi Robert,

Thanks a bunch for providing this comprehensive update. This is exactly 
the kind of perspective I was looking for, even when overall it means 
that for potential users of the Go SDK it is even sooner than what I 
might have hoped for.


For more context, my interest was primarily on the streaming side. From 
the list of missing features you listed, State + Timers + Triggers would 
probably be highest priority. Unfortunately I won't be able to 
contribute to the Go SDK anytime soon, so this is mostly fyi in case 
anyone else does.


On improving the IOs, I think it would make a lot of sense to focus on 
the cross-language route. There has been some work lately to make 
existing Beam Java IOs available on the Flink runner (Max would be able 
to share more details on that).


Thanks!
Thomas


On Wed, Apr 17, 2019 at 9:56 PM Robert Burke > wrote:


Oh dang. Thanks for mentioning that! Here's an open copy of the
versioning thoughts doc, though there shouldn't be any surprises
from the points I mentioned above.


https://docs.google.com/document/d/1ZjP30zNLWTu_WzkWbgY8F_ZXlA_OWAobAD9PuohJxPg/edit#heading=h.drpipq762xi7

On Wed, 17 Apr 2019 at 21:20, Nathan Fisher mailto:nfis...@junctionbox.ca>> wrote:

Hi Robert,

Great summary on the current state of play. FYI the referenced G
doc doesn't appear to people outside the org as a default.

Great to hear the Go SDK is still getting love. I last looked at
in September-October of last year.

Cheers,
Nathan

On Wed, 17 Apr 2019 at 20:27, Lukasz Cwik mailto:lc...@google.com>> wrote:

Thanks for the indepth summary.

On Mon, Apr 15, 2019 at 4:19 PM Robert Burke
mailto:rob...@frantil.com>> wrote:

Hi Thomas! I'm so glad you asked!

The status of the Go SDK is complicated, so this email
can't be brief. There's are several dimensions to
consider: as a Go Open Source Project, User Libraries
and Experience, and on Beam Features.

I'm going to be updating the roadmap later this month
when I have a spare moment.

*tl;dr;*
I would *love* help in improving the Go SDK, especially
around interactions with Java/Python/Flink. Java and I
do not have a good working relationship for operational
purposes, and the last time I used Python, I had to
re-image my machine. There's lots to do, but shouting
out tasks to the void is rarely as productive as it is
cathartic. If there's an offer to help, and a preference
for/experience with  something to work on, I'm willing
to find something useful to get started on for you.

(Note: The following are simply my opinion as someone
who works with the project weekly as a Go programmer,
and should not be treated as demands or gospel. I just
don't have anyone to talk about Go SDK issues with, and
my previous discussions, have largely seemed to fall on
uninterested ears.)

*The SDK can be considered Alpha when all of the
following are true:*
* The SDK is tested by the Beam project on a ULR and on
Flink as well as Dataflow.
* The IOs have received some love to ensure they can
scale (either through SDF or reshuffles), and be
portable to different environments (eg. using the Go
Cloud Development Kit (CDK) libraries).
    * Cross-Language IO support would also be acceptable.
* The SDK is using Go Modules for dependency management,
marking it as version 0.Minor (where Minor should
probably track the mainline Beam minor version for now).

*We can move to 

Re: Beam Summit at ApacheCon

2019-04-23 Thread Maximilian Michels

Hi Austin,

Thanks for the heads-up! I just want to highlight that this is a great 
chance for Beam. There will be a _dedicated_ Beam track which means that 
there is potential for lots of new people to learn about Beam. Of 
course, there will also be many people already involved in Beam.


-Max

On 23.04.19 02:47, Austin Bennett wrote:

Beam Summit will be at ApacheCon this year -- please consider submitting!

Dates for Beam Summit 11 and 12 September 2019.  There are other tracks 
at ApacheCon during this and on other dates too.


https://www.apachecon.com/acna19/cfp.html




Re: Integration of python/portable runner tests for Samza runner

2019-04-23 Thread Maximilian Michels

Hi Daniel,

Note that there is also Portable Validates Runner which runs Java 
portability tests. I don't know if you have integrated with that one 
already.


Thanks,
Max

On 23.04.19 02:28, Ankur Goenka wrote:

Hi Daniel,

We use flinkCompatibilityMatrix [1] to check the Flink compatibility 
with python. This is python equivalent to validatesRunner tests in java 
for portable runners.

I think we can reuse it for Samza Portable runner with minor refactoring.

[1] 
https://github.com/apache/beam/blob/bdb1a713a120a887e71e85c77879dc4446a58541/sdks/python/build.gradle#L305


On Mon, Apr 22, 2019 at 3:21 PM Daniel Chen > wrote:


Hi everyone,

I'm working on improving the validation of the Python portable Samza
runner. For java, we have the gradle task ( :validatesRunner) that
runs the runner validation tests.
I am looking for pointers on how to similarly integrate/enable the
portability and Python tests for the Samza runner.

Any help will be greatly appreciated.

Thanks,
Daniel



Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-23 Thread Maximilian Michels

Hi Jincheng,

Copying code is a solution for the short term. In the long run I'd like 
the Fn services to be a library not only for the Beam portability layer 
but also for other projects which want to leverage it. We should thus 
make an effort to make it more generic/extensible where necessary and 
feasible.


Since you are investigating reuse of Beam portability in the context of 
Flink, do you think it would make sense to setup a document where we 
collect ideas and challenges?


Thanks,
Max

On 23.04.19 13:00, jincheng sun wrote:

Hi Reuven,

I think you have provided an optional solution for other community which 
wants to take advantage of Beam's existing achievements. Thank you very 
much!


I think the Flink community can choose to copy from Beam's code or 
choose to rely directly on the beam's class library. The Flink community 
also initiated a discussion, more info can be found here 



The purpose of Turns `WindowedValue` into `T` is to promote the 
interface design of Beam more versatile, so that other open source 
projects have the opportunity to take advantage of Beam's existing 
achievements. Of course, just changing the `WindowedValue` into `T` 
is not enough to be shared by other projects in the form of a class 
library, we need to do more efforts. If Beam can provide a class library 
in the future, other community contributors will also have the 
willingness to contribute to the beam community. This will benefit both 
the community that wants to take advantage of Beam's existing 
achievements and the Beam community itself. And thanks to Thomas for 
that he has also made a lot of efforts in this regard.


Thanks again for your valuable suggestion, and welcome any feedback!

Best,
Jincheng

Reuven Lax mailto:re...@google.com>> 于2019年4月23日 
周二 上午1:00写道:


One concern here: these interfaces are intended for use within the
Beam project. Beam may decide to make specific changes to them to
support needed functionality in Beam. If they are being reused by
other projects, then those changes risk breaking those other
projects in unexpected ways. I don't think we can guarantee that we
don't do that. If this is useful in Flink, it would be safer to copy
the code IMO rather than to directly depend on it.

On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
mailto:sunjincheng...@gmail.com>> wrote:

Hi Kenn,

Thanks for your reply, and explained the design of WindowValue
clearly!

At present, the definitions of `FnDataService` and
`BeamFnDataClient` in Data Plane are very clear and universal,
such as: send(...)/receive(...). If it is only applied in the
project of Beam, it is already very good. Because `WindowValue`
is a very basic data structure in the Beam project, both the
Runner and the SDK harness have define the WindowedValue data
structure.

The reason I want to change the interface parameter from
`WindowedValue` to T is because I want to make the `Data
Plane` interface into a class library that can be used by other
projects (such as Apache Flink), so that other projects Can have
its own `FnDataService` implementation. However, the definition
of `WindowedValue` does not apply to all projects. For example,
Apache Flink also has a definition similar to WindowedValue. For
example, Apache Flink Stream has StreamRecord. If we change
`WindowedValue` to T, then other project's implementation
does not need to wrap WindowedValue, the interface will become
more concise.  Furthermore,  we only need one T, such as the
Apache Flink DataSet operator.

So, I agree with your understanding, I don't expect
`WindowedValueXXX` in the FnDataService interface, I hope to
just use a `T`.

Have you seen some problem if we change the interface parameter
from `WindowedValue` to T?

Thanks,
Jincheng

Kenneth Knowles mailto:k...@apache.org>> 于
2019年4月20日周六 上午2:38写道:

WindowedValue has always been an interface, not a concrete
representation:

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java

.
It is an abstract class because we started in Java 7 where
you could not have default methods, and just due to legacy
style concerns. it is not just discussed, but implemented,
that there are WindowedValue implementations with fewer
allocations.
At the coder level, it was also always intended to have
multipl

Re: Projects Can Apply Individually for Google Season of Docs

2019-04-23 Thread Maximilian Michels
Both proposals for doc improvements sound great. Portability is an 
obvious one and the capability matrix needs an update as well.


I might be a bit late to the party, but I'd like to help with the 
mentoring. I've filled out the mentor form.


Thanks,
Max

On 22.04.19 23:32, Pablo Estrada wrote:

Hello all,
thanks to everyone for your participation. I have submitted the 
application on behalf of Beam, and requested one technical writer. Let's 
see how it goes : )

Best
-P.

On Wed, Apr 17, 2019 at 10:09 PM Ahmet Altay > wrote:


Thanks Aizhamal, I completed the forms.

On Wed, Apr 17, 2019 at 6:46 PM Aizhamal Nurmamat kyzy
mailto:aizha...@google.com>> wrote:

Hi everyone,


Here are a few updates on our application for Season of Docs:


1. Pablo and I have created the following document [1] with some
of the project ideas shared in the mailing list. If you have
more ideas, please add them into the doc and provide
description. If you also want to be a mentor for the proposed
ideas, please add your name in the table.


2. To submit our application, we need to publish our project
ideas list. For this we have opened a Jira tickets with
“gsod2019” tag[2]. We should maybe also think of adding a small
blog post in the Beam website that contains all the ideas in one
place[3]? Please let me know what you think on this.


3. By next week Tuesday (Application Deadline)

  *

+pabl...@apache.org  , please
complete the org application form [4]

  *

@Ahmet Altay  , please complete
alternative administrator form [5]

  *

@pabl...@apache.org  , @Ahmet
Altay   , and all other
contributors that want to participate as mentors, please
complete the mentor registration form [6]


Thank you,

Aizhamal


[1]

https://docs.google.com/document/d/1FNf-BjB4Q7PDdqygPboLr7CyIeo6JAkrt0RBgs2I4dE/edit#

[2]

https://issues.apache.org/jira/browse/BEAM-7104?jql=project%20%3D%20BEAM%20AND%20status%20%3D%20Open%20AND%20labels%20%3D%20gsod2019

[3] https://beam.apache.org/blog/

[4]

https://docs.google.com/forms/d/e/1FAIpQLScrEq5yKmadgn7LEPC8nN811-6DNmYvus5uXv_JY5BX7CH-Bg/viewform

[5]

https://docs.google.com/forms/d/e/1FAIpQLSc5ZsBzqfsib-epktZp8bYxL_hO4RhT_Zz8AY6zXDHB79ue9g/viewform

[6]

https://docs.google.com/forms/d/e/1FAIpQLSe-JjGvaKKGWZOXxrorONhB8qN3mjPrB9ZVkcsntR73Cv_K7g/viewform

On Wed, Apr 10, 2019 at 2:57 PM Pablo Estrada
mailto:pabl...@google.com>> wrote:

I'd be happy to be a mentor for this to help add getting
started documentation for Python on Flink. I'd want to focus
on the reviews and less on the administration - so I'm
willing to be a secondary administrator if that's necessary
to move forward, but I'd love it if someone would help
administer.
FWIW, neither the administrator nor any other mentor has to
be a committer.

Anyone willing to be primary administrator and also a mentor?

Thanks
-P.

On Fri, Apr 5, 2019 at 9:40 AM Kenneth Knowles
mailto:k...@apache.org>> wrote:

Yes, this is great. Thanks for noticing the call and
pushing ahead on this, Aizhamal!

I would also like to see the runner comparison revamp at
https://issues.apache.org/jira/browse/BEAM-2888 which
would help users really understand what they can and
cannot do in plain terms.

Kenn

On Fri, Apr 5, 2019 at 9:30 AM Ahmet Altay
mailto:al...@google.com>> wrote:

Thank you Aizhamal for volunteering. I am happy to
help as an administrator.

cc: +Rose Nguyen 
+Melissa Pashniak  in
case they will be interested in mentorship
and/or administration.




On Fri, Apr 5, 2019 at 9:16 AM Thomas Weise
mailto:t...@apache.org>> wrote:

This is great. Beam documentation needs work in
several areas, Python SDK, portability and SQL
come to mind right away :)


On Thu, Apr 4, 2019 at 4:21 PM Aizhamal Nurmamat
kyzy mailto:aizha...@google.com>> wrote:

Hello everyone,

As the ASF announced that each project can
 

Re: Integration of python/portable runner tests for Samza runner

2019-04-24 Thread Maximilian Michels
If you are interested in portable python pipeline validation, I think 
fn_api_runner_test would also help.


Just to note, Ankur mentioned flinkCompatibilityMatrix, that one uses 
fn_api_runner_test with some tooling on top to bring up the test cluster.


On 23.04.19 19:23, Boyuan Zhang wrote:

Hi Daniel,
If you are interested in portable python pipeline validation, I think 
fn_api_runner_test 
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py> 
would also help.


On Tue, Apr 23, 2019 at 10:19 AM Pablo Estrada <mailto:pabl...@google.com>> wrote:


This is cool, Daniel : ) Glad to see the Samza runner moving forward.
Best
-P.

On Tue, Apr 23, 2019 at 2:52 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi Daniel,

Note that there is also Portable Validates Runner which runs Java
portability tests. I don't know if you have integrated with that
one
already.

Thanks,
Max

On 23.04.19 02:28, Ankur Goenka wrote:
 > Hi Daniel,
 >
 > We use flinkCompatibilityMatrix [1] to check the Flink
compatibility
 > with python. This is python equivalent to validatesRunner
tests in java
 > for portable runners.
 > I think we can reuse it for Samza Portable runner with minor
refactoring.
 >
 > [1]
 >

https://github.com/apache/beam/blob/bdb1a713a120a887e71e85c77879dc4446a58541/sdks/python/build.gradle#L305
 >
 > On Mon, Apr 22, 2019 at 3:21 PM Daniel Chen
mailto:danx...@gmail.com>
 > <mailto:danx...@gmail.com <mailto:danx...@gmail.com>>> wrote:
 >
 >     Hi everyone,
 >
 >     I'm working on improving the validation of the Python
portable Samza
 >     runner. For java, we have the gradle task (
:validatesRunner) that
 >     runs the runner validation tests.
 >     I am looking for pointers on how to similarly
integrate/enable the
 >     portability and Python tests for the Samza runner.
 >
 >     Any help will be greatly appreciated.
 >
 >     Thanks,
 >     Daniel
 >



Re: Artifact staging in cross-language pipelines

2019-04-24 Thread Maximilian Michels
 >
 > It would be nice to be able to select the environment for the
external transforms. For example, I would like to be able to use
EMBEDDED for Flink. That's implicit for sources which are runner
native unbounded read translations, but it should also be possible
for writes. That would then be similar to how pipelines are packaged
and run with the "legacy" runner.
 >
 > Thomas
 >
 >
 > On Mon, Apr 22, 2019 at 1:18 PM Ankur Goenka mailto:goe...@google.com>> wrote:
 >>
 >> Great discussion!
 >> I have a few points around the structure of proto but that is
less important as it can evolve.
 >> However, I think that artifact compatibility is another
important aspect to look at.
 >> Example: TransformA uses Guava 1.6>< 1.7, TransformB uses
1.8><1.9 and TransformC uses 1.6><1.8. As sdk provide the
environment for each transform, it can not simply say
EnvironmentJava for both TransformA and TransformB as the
dependencies are not compatible.
 >> We should have separate environment associated with TransformA
and TransformB in this case.
 >>
 >> To support this case, we need 2 things.
 >> 1: Granular metadata about the dependency including type.
 >> 2: Complete list of the transforms to be expanded.
 >>
 >> Elaboration:
 >> The compatibility check can be done in a crude way if we provide
all the metadata about the dependency to expansion service.
 >> Also, the expansion service should expand all the applicable
transforms in a single call so that it knows about incompatibility
and create separate environments for these transforms. So in the
above example, expansion service will associate EnvA to TransformA
and EnvB to TransformB and EnvA to TransformC. This will ofcource
require changes to Expansion service proto but giving all the
information to expansion service will make it support more case and
make it a bit more future proof.
 >>
 >>
 >> On Mon, Apr 22, 2019 at 10:16 AM Maximilian Michels
mailto:m...@apache.org>> wrote:
 >>>
 >>> Thanks for the summary Cham. All makes sense. I agree that we
want to
 >>> keep the option to manually specify artifacts.
 >>>
 >>> > There are few unanswered questions though.
 >>> > (1) In what form will a transform author specify dependencies
? For example, URL to a Maven repo, URL to a local file, blob ?
 >>>
 >>> Going forward, we probably want to support multiple ways. For
now, we
 >>> could stick with a URL-based approach with support for
different file
 >>> systems. In the future a list of packages to retrieve from
Maven/PyPi
 >>> would be useful.
 >>>
 >> We can ask user for (type, metadata). For maven it can be
something like (MAVEN, {groupId:com.google.guava, artifactId: guava,
version: 19}) or (FILE, file://myfile)
 >> To begin with, we can only support a few types like File and can
add more types in future.
 >>>
 >>> > (2) How will dependencies be included in the expansion
response proto ? String (URL), bytes (blob) ?
 >>>
 >>> I'd go for a list of Protobuf strings first but the format
would have to
 >>> evolve for other dependency types.
 >>>
 >> Here also (type, payload) should suffice. We can have
interpreter for each type to translate the payload.
 >>>
 >>> > (3) How will we manage/share transitive dependencies required
at runtime ?
 >>>
 >>> I'd say transitive dependencies have to be included in the
list. In case
 >>> of fat jars, they are reduced to a single jar.
 >>
 >> Makes sense.
 >>>
 >>>
 >>> > (4) How will dependencies be staged for various runner/SDK
combinations ? (for example, portable runner/Flink, Dataflow runner)
 >>>
 >>> Staging should be no different than it is now, i.e. go through
Beam's
 >>> artifact staging service. As long as the protocol is stable,
there could
 >>> also be different implementations.
 >>
 >> Makes sense.
 >>>
 >>>
 >>> -Max
 >>>
 >>> On 20.04.19 03:08, Chamikara Jayalath wrote:
 >>> > OK, sounds like this is a good path forward then.
 >>> >
 >>> > * When starting up the expansion service, user (that starts
up the
 >>> 

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

2019-04-24 Thread Maximilian Michels

Hi Stephan,

This is excited! Thanks for sharing. The inter-process communication 
code looks like the most natural choice as a common ground. To go 
further, there are indeed some challenges to solve.



=> Biggest question is whether the language-independent DAG is expressive 
enough to capture all the expressions that we want to map directly to Table API 
expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure 
should be flexible enough to capture more expressions transparently.


Just to add some context how this could be done, there is the concept of 
a FunctionSpec which is part of a transform in the DAG. FunctionSpec 
contains a URN and with a payload. FunctionSpec can be either (1) 
translated by the Runner directly, e.g. map to table API concepts or (2) 
run a user-defined function with an Environment. It could be feasible 
for Flink to choose the direct path, whereas Beam Runners would leverage 
the more generic approach using UDFs. Granted, compatibility across 
Flink and Beam would only work if both of the translation paths yielded 
the same semantics.



 If the DAG is generic enough to capture the additional information, we 
probably still need some standardization, so that all the different language 
APIs represent their expressions the same way


I wonder whether that's necessary as a first step. I think it would be 
fine for Flink to have its own way to represent API concepts in the Beam 
DAG which Beam Runners may not be able to understand. We could then 
successively add the capability for these transforms to run with Beam.



 Similarly, it makes sense to standardize the type system (and type inference) 
as far as built-in expressions and their interaction with UDFs are concerned. 
The Flink Table API and Blink teams found this to be essential for a consistent 
API behavior. This would not prevent all-UDF programs from still using purely 
binary/opaque types.


Beam has a set of standard coders which can be used across languages. We 
will have to expand those to play well with Flink's: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types


I think we will need to exchange more ideas to work out a model that 
will work for both Flink and Beam. A regular meeting could be helpful.


Thanks,
Max

On 23.04.19 21:23, Stephan Ewen wrote:

Hi all!

Below are my notes on the discussion last week on how to collaborate 
between Beam and Flink.
The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan, 
Jincheng, and me.


This represents my understanding of the discussion, please augment this 
where I missed something or where your conclusion was different.


Best,
Stephan

===

*Beams Python and Portability Framework*

   - Portability core to Beam
   - Language independent dataflow DAG that is defined via ProtoBuf
   - DAG can be generated from various languages (Java, Python, Go)
   - The DAG describes the pipelines and contains additional parameters 
to describe each operator, and contains artifacts that need to be 
deployed / executed as part of an operator execution.
   - Operators execute in language-specific containers, data is 
exchanged between the language-specific container and the runner 
container (JVM) via gRPC.


*Flink's desiderata for Python API*

   - Python API should mirror Java / Scala Table API
   - All relational expressions that correspond to built-in functions 
should be translated to corresponding expressions in the Table API. That 
way the planner generated Java code for the data types and built-in 
expressions, meaning no Python code is necessary during execution

   - UDFs should be supported and run similarly as in Beam's approach
   - Python programs should be similarly created and submitted/deployed 
as Java / Scala programs (CLI, web, containerized, etc.)


*Consensus to share inter-process communication code*

   - Crucial code for robust setup and high performance data exchange 
across processes
   - The code for the SDK harness, the artifact boostrapping, and the 
data exchange make sense to share.
   - Ongoing discussion whether this can be a dedicated module with slim 
dependencies in Beam


*Potential Long Term Perspective: Share language-independent DAG 
representation*


   - Beam's language independent DAG could become a standard 
representation used in both projects
   - Flink would need an way to receive that DAG, map it to the Table 
API, execute it from there
   - The DAG would need to have a standardized representation of 
functions and expressions that then get mapped to Table API expressions 
to let the planner optimize those and generate Java code for those
   - Similar as UDFs are supported in the Table API, there would be 
additional "external UDFs" that would go through the above mentioned 
inter-process communication layer


   - _Advantages:_
     => Flink and Beam could share more language bindings
     => Flink 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-24 Thread Maximilian Michels
Fully agree that this is an effort that goes beyond changing a type 
parameter but I think we have a chance here to cooperate between the two 
projects. I would be happy to help out where I can.


I'm not sure at this point what exactly is feasible for reuse but I 
would imagine the Runner-related code to be useful as well for the 
interaction with the SDK Harness. There are some fundamental differences 
in the model, e.g. how windowing works, which might be challenging to 
work around.


Thanks,
Max

On 24.04.19 12:03, jincheng sun wrote:


Hi Kenn, I think you are right, the Python SDK harness can be shared to 
Flink, and also need to add some new primitive operations. Regarding 
runner-side, I think most of the code which in runners/java-fun- 
Execution is can be shared(but need some improvement, such as 
FnDataService), some of them cannot be shared, such as job submission 
code. So, we may need to set up a document to clearly analyze which ones 
can be shared, which ones can be shared but need to do some changes, and 
which ones are definitely cannot be shared.


Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn 
service as a library, also willing to do more efforts for this.
 From the view of the current code, abstracting Fn Service into a class 
library that other projects can rely on requires a lot of effort from 
the Beam community. Turn `WindowedValue` into `T` is just the 
beginning of this effort. If the Beam community is willing on 
abstracting Fn Service into a class library that can be relied upon by 
other projects, I can try to draft a document, of course during this 
period may need a lot of help from you, Kenn, Lukasz, and the Beam 
community. (I am a recruit in the Beam community :-))


What do you think?

Regards,
Jincheng

Kenneth Knowles mailto:k...@apache.org>> 于2019年4月24 
日周三 上午3:32写道:


It seems to me that the most valuable code to share and keep up with
is the Python/Go/etc SDK harness; they would need to be enhanced
with new primitive operations. So you would want to depend directly
and share the original proto-generated classes too, which Beam
publishes as separate artifacts for Java. Is the runner-side support
code that valuable for direct integration into Flink? I would expect
once you get past trivial wrappers (that you can copy/paste with no
loss) you would hit differences in architecture so you would diverge
anyhow.

Kenn

On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi Jincheng,

Copying code is a solution for the short term. In the long run
I'd like
the Fn services to be a library not only for the Beam
portability layer
but also for other projects which want to leverage it. We should
thus
make an effort to make it more generic/extensible where
necessary and
feasible.

Since you are investigating reuse of Beam portability in the
context of
Flink, do you think it would make sense to setup a document
where we
collect ideas and challenges?

Thanks,
Max

On 23.04.19 13:00, jincheng sun wrote:
 > Hi Reuven,
 >
 > I think you have provided an optional solution for other
community which
 > wants to take advantage of Beam's existing
achievements. Thank you very
 > much!
 >
 > I think the Flink community can choose to copy from Beam's
code or
 > choose to rely directly on the beam's class library. The
Flink community
 > also initiated a discussion, more info can be found here
 >

<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>
 >
 > The purpose of Turns `WindowedValue` into `T` is to
promote the
 > interface design of Beam more versatile, so that other open
source
 > projects have the opportunity to take advantage of Beam's
existing
 > achievements. Of course, just changing the `WindowedValue`
into `T`
 > is not enough to be shared by other projects in the form of a
class
 > library, we need to do more efforts. If Beam can provide a
class library
 > in the future, other community contributors will also have the
 > willingness to contribute to the beam community. This will
benefit both
 > the community that wants to take advantage of Beam's existing
 > achievements and the Beam community itself. And thanks to
Thomas for
 > that he has also made a lot of efforts in this regard.
 >
 > Thanks again for your valuable suggestion,

Re: [Discuss] Publishing pre-release artifacts to repositories

2019-04-25 Thread Maximilian Michels

wouldn't that be in conflict with Apache release policy [1] ?
[1] http://www.apache.org/legal/release-policy.html


Indeed, advertising pre-release artifacts is against ASF rules. For 
example, Flink was asked to remove a link to the Maven snapshot 
repository from their download page.


However, that does not mean we cannot publish Python artifacts. We just 
have to clearly mark them for developers only and not advertise them 
alongside with the official releases.


-Max

On 25.04.19 10:23, Robert Bradshaw wrote:

Don't we push java artifacts to maven repositories as part of the RC
process? And completely unvetted snapshots? (Or is this OK because
they are special opt-in apache-only ones?)

I am generally in favor of the idea, but would like to avoid increased
toil on the release manager.

One potential hitch I see is that current release process updates the
versions to x.y.z (no RC or other pre-release indicator in the version
number) whereas pypi (and other systems) typically expect distinct
(recognizable) version numbers for each attempt, and only the actual
final result has the actual final release version.

On Thu, Apr 25, 2019 at 6:38 AM Ahmet Altay  wrote:


I do not know the answer.I believe this will be similar to sharing the RC 
artifacts for validation purposes and would not be a formal release by itself. 
But I am not an expert and I hope others will share their opinions.

I quickly searched pypi for apache projects and found at least airflow [1] and 
libcloud [2] are publishing rc artifacts to pypi. We can reach out to those 
communities and learn about their processes.

Ahmet

[1] https://pypi.org/project/apache-airflow/#history
[2] https://pypi.org/project/apache-libcloud/#history

On Wed, Apr 24, 2019 at 6:15 PM Michael Luckey  wrote:


Hi,

wouldn't that be in conflict with Apache release policy [1] ?

[1] http://www.apache.org/legal/release-policy.html

On Thu, Apr 25, 2019 at 1:35 AM Alan Myrvold  wrote:


Great idea. I like the RC candidates to follow as much as the release artifact 
process as possible.

On Wed, Apr 24, 2019 at 3:27 PM Ahmet Altay  wrote:


To clarify my proposal, I am proposing publishing to the production pypi 
repository with an rc tag in the version. And in turn allow users to depend on 
beam's rc version + all the other regular dependencies users would have 
directly from pypi.

Publishing to test pypi repo would also be helpful if test pypi repo also 
mirrors other packages that exist in the production pypi repository.

On Wed, Apr 24, 2019 at 3:12 PM Pablo Estrada  wrote:


I think this is a great idea. A way of doing it for python would be by using 
the test repository for PyPi[1], and that way we would not have to do an 
official PyPi release, but still would be able to install it with pip (by 
passing an extra flag), and test.

In fact, there are some Beam artifacts already in there[2]. At some point I 
looked into this, but couldn't figure out who has access/the password for it.



I also don't know who owns beam package in test pypi repo. Does anybody know?




In short: +1, and I would suggest using the test PyPi repo to avoid publishing 
to the main PyPi repo.
Best
-P.

[1] https://test.pypi.org/
[2] https://test.pypi.org/project/apache-beam/

On Wed, Apr 24, 2019 at 3:04 PM Ahmet Altay  wrote:


Hi all,

What do you think about the idea of publishing pre-release artifacts as part of 
the RC emails?

For Python this would translate into publishing the same artifacts from RC email with a 
version like "2.X.0rcY" to pypi. I do not know, but I am guessing we can do a 
similar thing with Maven central for Java artifacts as well.

Advantages would be:
- Allow end users to validate RCs for their own purposes using the same exact 
process they will normally use.
  - Enable early-adaptors to start using RC releases early on in the release 
cycle if that is what they would like to do. This will in turn reduce time 
pressure on some releases. Especially for cases like someone needs a release to 
be finalized for an upcoming event.

There will also be disadvantages, some I could think of:
- Users could request support for RC artifacts. Hopefully in the form of 
feedback for us to improve the release. But it could also be in the form of 
folks using RC artifacts for production for a long time.
- It will add toil to the current release process, there will be one more step 
for each RC. I think for python this will be a small step but nevertheless it 
will be additional work.

For an example of this, you can take a look at tensorflow releases. For 1.13 
there were 3 pre-releases [1].

Ahmet

[1] https://pypi.org/project/tensorflow/#history


Re: [docs] Python State & Timers

2019-04-25 Thread Maximilian Michels

@Pablo: Thanks for following up with the PR! :)

@Brian: I was wondering about this as well. It makes the Python state 
code a bit unnatural. I'd suggest to add a ValueState wrapper around 
ListState/CombiningState.


@Robert: Like Reuven pointed out, we can disallow ValueState for merging 
windows with state.


@Reza: Great. Let's make sure it has Python examples out of the box. 
Either Pablo or me could help there.


Thanks,
Max

On 25.04.19 04:14, Reza Ardeshir Rokni wrote:
Pablo, Kenneth and I have a new blog ready for publication which covers 
how to create a "looping timer" it allows for default values to be 
created in a window when no incoming elements exists. We just need to 
clear a few bits before publication, but would be great to have that 
also include a python example, I wrote it in java...


Cheers

Reza

On Thu, 25 Apr 2019 at 04:34, Reuven Lax <mailto:re...@google.com>> wrote:


Well state is still not implemented for merging windows even for
Java (though I believe the idea was to disallow ValueState there).

On Wed, Apr 24, 2019 at 1:11 PM Robert Bradshaw mailto:rober...@google.com>> wrote:

It was unclear what the semantics were for ValueState for merging
windows. (It's also a bit weird as it's inherently a race condition
wrt element ordering, unlike Bag and CombineState, though you can
always implement it as a CombineState that always returns the latest
value which is a bit more explicit about the dangers here.)

On Wed, Apr 24, 2019 at 10:08 PM Brian Hulette
mailto:bhule...@google.com>> wrote:
 >
 > That's a great idea! I thought about this too after those
posts came up on the list recently. I started to look into it,
but I noticed that there's actually no implementation of
ValueState in userstate. Is there a reason for that? I started
to work on a patch to add it but I was just curious if there was
some reason it was omitted that I should be aware of.
 >
 > We could certainly replicate the example without ValueState
by using BagState and clearing it before each write, but it
would be nice if we could draw a direct parallel.
 >
     > Brian
 >
 > On Fri, Apr 12, 2019 at 7:05 AM Maximilian Michels
mailto:m...@apache.org>> wrote:
 >>
 >> > It would probably be pretty easy to add the corresponding
code snippets to the docs as well.
 >>
 >> It's probably a bit more work because there is no section
dedicated to
 >> state/timer yet in the documentation. Tracked here:
 >> https://jira.apache.org/jira/browse/BEAM-2472
 >>
 >> > I've been going over this topic a bit. I'll add the
snippets next week, if that's fine by y'all.
 >>
 >> That would be great. The blog posts are a great way to get
started with
 >> state/timers.
 >>
 >> Thanks,
 >> Max
 >>
 >> On 11.04.19 20:21, Pablo Estrada wrote:
 >> > I've been going over this topic a bit. I'll add the
snippets next week,
 >> > if that's fine by y'all.
 >> > Best
 >> > -P.
 >> >
 >> > On Thu, Apr 11, 2019 at 5:27 AM Robert Bradshaw
mailto:rober...@google.com>
 >> > <mailto:rober...@google.com <mailto:rober...@google.com>>>
wrote:
 >> >
 >> >     That's a great idea! It would probably be pretty easy
to add the
 >> >     corresponding code snippets to the docs as well.
 >> >
 >> >     On Thu, Apr 11, 2019 at 2:00 PM Maximilian Michels
mailto:m...@apache.org>
 >> >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
 >> >      >
 >> >      > Hi everyone,
 >> >      >
 >> >      > The Python SDK still lacks documentation on state
and timers.
 >> >      >
 >> >      > As a first step, what do you think about updating
these two blog
 >> >     posts
 >> >      > with the corresponding Python code?
 >> >      >
 >> >      >
https://beam.apache.org/blog/2017/02/13/stateful-processing.html
 >> >      >
https://beam.apache.org/blog/2017/08/28/timely-processing.html
 >> >      >
 >> >      > Thanks,
 >> >      > Max
 >> >



Re: Custom shardingFn for FileIO

2019-04-25 Thread Maximilian Michels

Hi Jozef,

For sharding in FileIO there are basically two options:

(1) num_shards ~= num_workers => bad spread of the load across workers
(2) num_shards >> num_workers => good spread of the load across workers, 
but huge number of files


Your approach would give users control over the sharding keys such that 
they could be adjusted to spread load more evenly.


I'd like to hear from Beam IO experts if that would make sense.

Thanks,
Max

On 25.04.19 08:52, Jozef Vilcek wrote:

Hello,

Right now, if someone needs sharded files via FileIO, there is only one 
option which is random (round robin) shard assignment per element and it 
always use ShardedKey as a key for the GBK which follows.


I would like to generalize this and have a possibility to provide some 
ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
What I am mainly after is, to have a possibility to provide optimisation 
for Flink runtime and pass in a special function which generates shard 
keys in a way that they are evenly spread among workers (BEAM-5865).


Would such extension for FileIO make sense? If yes, I would create a 
ticket for it and try to draft a PR.


Best,
Jozef


Re: Integration of python/portable runner tests for Samza runner

2019-04-25 Thread Maximilian Michels

> - For portable running tests: by looking at the
> portableValidatesRunnerTask in flink_job_server.gradle, it seems it's
> the same set of Java tests but using portability framework to validate
> (submit to job server and run the protable pipeline in a specific
> runner). Is my understanding correct?

That's correct.

> - For python tests: Looks like flinkValidatesRunner is using LOOPBACK
> SDK worker type in the tests. Not sure what LOOPBACK does. Is it used
> for testing? Currently Samza portable runner supports PROCESS worker.

This avoids building and using containers for this test. We had a number 
of issues with Docker on Jenkins and wanted to lower build time for 
PreCommit tests. Loopback means that an embedded Python environment will 
be started which listens on localhost. It's comparable to Java's 
EmbeddedSdkHarness.


-Max

On 24.04.19 20:10, Xinyu Liu wrote:
Thanks for the useful pointers! We are looking forward to integrating 
both Portable and Python-specific tests for Samza runner. A few questions:


- For portable running tests: by looking at the 
portableValidatesRunnerTask in flink_job_server.gradle, it seems it's 
the same set of Java tests but using portability framework to validate 
(submit to job server and run the protable pipeline in a specific 
runner). Is my understanding correct?


- For python tests: Looks like flinkValidatesRunner is using LOOPBACK 
SDK worker type in the tests. Not sure what LOOPBACK does. Is it used 
for testing? Currently Samza portable runner supports PROCESS worker.


Thanks,
Xinyu



On Wed, Apr 24, 2019 at 2:45 AM Maximilian Michels <mailto:m...@apache.org>> wrote:


 > If you are interested in portable python pipeline validation, I
think
 > fn_api_runner_test would also help.

Just to note, Ankur mentioned flinkCompatibilityMatrix, that one uses
fn_api_runner_test with some tooling on top to bring up the test
cluster.

On 23.04.19 19:23, Boyuan Zhang wrote:
 > Hi Daniel,
 > If you are interested in portable python pipeline validation, I
think
 > fn_api_runner_test
 >

<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py>

 > would also help.
 >
 > On Tue, Apr 23, 2019 at 10:19 AM Pablo Estrada
mailto:pabl...@google.com>
 > <mailto:pabl...@google.com <mailto:pabl...@google.com>>> wrote:
 >
 >     This is cool, Daniel : ) Glad to see the Samza runner moving
forward.
 >     Best
 >     -P.
 >
 >     On Tue, Apr 23, 2019 at 2:52 AM Maximilian Michels
mailto:m...@apache.org>
 >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
 >
 >         Hi Daniel,
 >
 >         Note that there is also Portable Validates Runner which
runs Java
 >         portability tests. I don't know if you have integrated
with that
 >         one
 >         already.
 >
 >         Thanks,
 >         Max
 >
 >         On 23.04.19 02:28, Ankur Goenka wrote:
 >          > Hi Daniel,
 >          >
 >          > We use flinkCompatibilityMatrix [1] to check the Flink
 >         compatibility
 >          > with python. This is python equivalent to validatesRunner
 >         tests in java
 >          > for portable runners.
 >          > I think we can reuse it for Samza Portable runner with
minor
 >         refactoring.
 >          >
 >          > [1]
 >          >
 >

https://github.com/apache/beam/blob/bdb1a713a120a887e71e85c77879dc4446a58541/sdks/python/build.gradle#L305
 >          >
 >          > On Mon, Apr 22, 2019 at 3:21 PM Daniel Chen
 >         mailto:danx...@gmail.com>
<mailto:danx...@gmail.com <mailto:danx...@gmail.com>>
 >          > <mailto:danx...@gmail.com <mailto:danx...@gmail.com>
<mailto:danx...@gmail.com <mailto:danx...@gmail.com>>>> wrote:
 >          >
 >          >     Hi everyone,
 >          >
 >          >     I'm working on improving the validation of the Python
 >         portable Samza
 >          >     runner. For java, we have the gradle task (
 >         :validatesRunner) that
 >          >     runs the runner validation tests.
 >          >     I am looking for pointers on how to similarly
 >         integrate/enable the
 >          >     portability and Python tests for the Samza runner.
 >          >
 >          >     Any help will be greatly appreciated.
 >          >
 >          >     Thanks,
 >          >     Daniel
 >          >
 >



Re: Custom shardingFn for FileIO

2019-04-25 Thread Maximilian Michels
Reuven is talking about PTransformOverride, e.g. 
FlinkTransformOverrides. We already use this to determine the number of 
shards in case of Runner-determined sharding.


Not sure if that would work for Jozef's case because setting the number 
of shards is not enough. We want to set the shard key directly and that 
logic is buried inside WriteFiles.


-Max

On 25.04.19 16:30, Reuven Lax wrote:
Actually the runner is free to perform surgery on the graph. The 
FlinkRunner can insert a custom function to determine the sharding keys.


On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <mailto:jozo.vil...@gmail.com>> wrote:


Right now, sharding can be specified only via target `shardCount`,
be it user or runner. Next to configurable shardCount, I am
proposing to be able to pass also a function which will allow to the
user (or runner) control how is shard determined and what key will
be used to represent it

interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
Serializable {
    ShardKeyT assign(DestinationT destination, UserT element,
shardCount: Integer);
}

Default implementation can be what is right now =>  random shard
encapsulated as ShardedKey.

On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax mailto:re...@google.com>> wrote:

If sharding is not specified, then the semantics are
"runner-determined sharding." The DataflowRunner already takes
advantage of this to impose its own sharding if the user hasn't
specified an explicit one. Could the Flink runner do the same
instead of pushing this to the users?

    On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

Hi Jozef,

For sharding in FileIO there are basically two options:

(1) num_shards ~= num_workers => bad spread of the load
across workers
(2) num_shards >> num_workers => good spread of the load
across workers,
but huge number of files

Your approach would give users control over the sharding
keys such that
they could be adjusted to spread load more evenly.

I'd like to hear from Beam IO experts if that would make sense.

Thanks,
Max

On 25.04.19 08:52, Jozef Vilcek wrote:
 > Hello,
 >
 > Right now, if someone needs sharded files via FileIO,
there is only one
 > option which is random (round robin) shard assignment per
element and it
 > always use ShardedKey as a key for the GBK which
follows.
 >
 > I would like to generalize this and have a possibility to
provide some
 > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
 > What I am mainly after is, to have a possibility to
provide optimisation
 > for Flink runtime and pass in a special function which
generates shard
 > keys in a way that they are evenly spread among workers
(BEAM-5865).
 >
 > Would such extension for FileIO make sense? If yes, I
would create a
 > ticket for it and try to draft a PR.
 >
 > Best,
 > Jozef



Re: [docs] Python State & Timers

2019-04-25 Thread Maximilian Michels

I forgot to give an example, just to clarify for others:


What was the specific example that was less natural?


Basically every time we use ListState to express ValueState, e.g.

  next_index, = list(state.read()) or [0]

Taken from: 
https://github.com/apache/beam/pull/8363/files#diff-ba1a2aed98079ccce869cd660ca9d97dR301


-Max

On 25.04.19 16:40, Robert Bradshaw wrote:

https://github.com/apache/beam/pull/8402

On Thu, Apr 25, 2019 at 4:26 PM Robert Bradshaw  wrote:


Oh, this is for the indexing example.

I actually think using CombiningState is more cleaner than ValueState.

https://github.com/apache/beam/blob/release-2.12.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L262

(The fact that one must specify the accumulator coder is, however,
unfortunate. We should probably infer that if we can.)

On Thu, Apr 25, 2019 at 4:19 PM Robert Bradshaw  wrote:


The desire was to avoid the implicit disallowed combination wart in
Python (until we could make sense of it), and also ValueState could be
surprising with respect to older values overwriting newer ones. What
was the specific example that was less natural?

On Thu, Apr 25, 2019 at 3:01 PM Maximilian Michels  wrote:


@Pablo: Thanks for following up with the PR! :)

@Brian: I was wondering about this as well. It makes the Python state
code a bit unnatural. I'd suggest to add a ValueState wrapper around
ListState/CombiningState.

@Robert: Like Reuven pointed out, we can disallow ValueState for merging
windows with state.

@Reza: Great. Let's make sure it has Python examples out of the box.
Either Pablo or me could help there.

Thanks,
Max

On 25.04.19 04:14, Reza Ardeshir Rokni wrote:

Pablo, Kenneth and I have a new blog ready for publication which covers
how to create a "looping timer" it allows for default values to be
created in a window when no incoming elements exists. We just need to
clear a few bits before publication, but would be great to have that
also include a python example, I wrote it in java...

Cheers

Reza

On Thu, 25 Apr 2019 at 04:34, Reuven Lax mailto:re...@google.com>> wrote:

 Well state is still not implemented for merging windows even for
 Java (though I believe the idea was to disallow ValueState there).

 On Wed, Apr 24, 2019 at 1:11 PM Robert Bradshaw mailto:rober...@google.com>> wrote:

 It was unclear what the semantics were for ValueState for merging
 windows. (It's also a bit weird as it's inherently a race condition
 wrt element ordering, unlike Bag and CombineState, though you can
 always implement it as a CombineState that always returns the latest
 value which is a bit more explicit about the dangers here.)

 On Wed, Apr 24, 2019 at 10:08 PM Brian Hulette
 mailto:bhule...@google.com>> wrote:
  >
  > That's a great idea! I thought about this too after those
 posts came up on the list recently. I started to look into it,
 but I noticed that there's actually no implementation of
 ValueState in userstate. Is there a reason for that? I started
 to work on a patch to add it but I was just curious if there was
 some reason it was omitted that I should be aware of.
  >
  > We could certainly replicate the example without ValueState
 by using BagState and clearing it before each write, but it
 would be nice if we could draw a direct parallel.
  >
  > Brian
  >
  > On Fri, Apr 12, 2019 at 7:05 AM Maximilian Michels
 mailto:m...@apache.org>> wrote:
  >>
  >> > It would probably be pretty easy to add the corresponding
 code snippets to the docs as well.
  >>
  >> It's probably a bit more work because there is no section
 dedicated to
  >> state/timer yet in the documentation. Tracked here:
  >> https://jira.apache.org/jira/browse/BEAM-2472
  >>
  >> > I've been going over this topic a bit. I'll add the
 snippets next week, if that's fine by y'all.
  >>
  >> That would be great. The blog posts are a great way to get
 started with
  >> state/timers.
  >>
  >> Thanks,
  >> Max
  >>
  >> On 11.04.19 20:21, Pablo Estrada wrote:
  >> > I've been going over this topic a bit. I'll add the
 snippets next week,
  >> > if that's fine by y'all.
  >> > Best
  >> > -P.
  >> >
  >> > On Thu, Apr 11, 2019 at 5:27 AM Robert Bradshaw
 mailto:rober...@google.com>
  >> > <mailto:rober...@google.com <

Re: [docs] Python State & Timers

2019-04-25 Thread Maximilian Michels
Completely agree that CombiningState is nicer in this example. Users may 
still want to use ValueState when there is nothing to combine. Also, 
users already know ValueState from the Java SDK.


On 25.04.19 17:12, Robert Bradshaw wrote:

On Thu, Apr 25, 2019 at 4:58 PM Maximilian Michels  wrote:


I forgot to give an example, just to clarify for others:


What was the specific example that was less natural?


Basically every time we use ListState to express ValueState, e.g.

next_index, = list(state.read()) or [0]

Taken from:
https://github.com/apache/beam/pull/8363/files#diff-ba1a2aed98079ccce869cd660ca9d97dR301


Yes, ListState is much less natural here. I think generally
CombiningValue is often a better replacement. E.g. the Java example
reads


public void processElement(
   ProcessContext context, @StateId("index") ValueState index) {
 int current = firstNonNull(index.read(), 0);
 context.output(KV.of(current, context.element()));
 index.write(current+1);
}


which is replaced with bag state


def process(self, element, state=DoFn.StateParam(INDEX_STATE)):
 next_index, = list(state.read()) or [0]
 yield (element, next_index)
 state.clear()
 state.add(next_index + 1)


whereas CombiningState would be more natural (than ListState, and
arguably than even ValueState), giving


def process(self, element, index=DoFn.StateParam(INDEX_STATE)):
 yield element, index.read()
 index.add(1)






-Max

On 25.04.19 16:40, Robert Bradshaw wrote:

https://github.com/apache/beam/pull/8402

On Thu, Apr 25, 2019 at 4:26 PM Robert Bradshaw  wrote:


Oh, this is for the indexing example.

I actually think using CombiningState is more cleaner than ValueState.

https://github.com/apache/beam/blob/release-2.12.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L262

(The fact that one must specify the accumulator coder is, however,
unfortunate. We should probably infer that if we can.)

On Thu, Apr 25, 2019 at 4:19 PM Robert Bradshaw  wrote:


The desire was to avoid the implicit disallowed combination wart in
Python (until we could make sense of it), and also ValueState could be
surprising with respect to older values overwriting newer ones. What
was the specific example that was less natural?

On Thu, Apr 25, 2019 at 3:01 PM Maximilian Michels  wrote:


@Pablo: Thanks for following up with the PR! :)

@Brian: I was wondering about this as well. It makes the Python state
code a bit unnatural. I'd suggest to add a ValueState wrapper around
ListState/CombiningState.

@Robert: Like Reuven pointed out, we can disallow ValueState for merging
windows with state.

@Reza: Great. Let's make sure it has Python examples out of the box.
Either Pablo or me could help there.

Thanks,
Max

On 25.04.19 04:14, Reza Ardeshir Rokni wrote:

Pablo, Kenneth and I have a new blog ready for publication which covers
how to create a "looping timer" it allows for default values to be
created in a window when no incoming elements exists. We just need to
clear a few bits before publication, but would be great to have that
also include a python example, I wrote it in java...

Cheers

Reza

On Thu, 25 Apr 2019 at 04:34, Reuven Lax mailto:re...@google.com>> wrote:

  Well state is still not implemented for merging windows even for
  Java (though I believe the idea was to disallow ValueState there).

  On Wed, Apr 24, 2019 at 1:11 PM Robert Bradshaw mailto:rober...@google.com>> wrote:

  It was unclear what the semantics were for ValueState for merging
  windows. (It's also a bit weird as it's inherently a race condition
  wrt element ordering, unlike Bag and CombineState, though you can
  always implement it as a CombineState that always returns the latest
  value which is a bit more explicit about the dangers here.)

  On Wed, Apr 24, 2019 at 10:08 PM Brian Hulette
  mailto:bhule...@google.com>> wrote:
   >
   > That's a great idea! I thought about this too after those
  posts came up on the list recently. I started to look into it,
  but I noticed that there's actually no implementation of
  ValueState in userstate. Is there a reason for that? I started
  to work on a patch to add it but I was just curious if there was
  some reason it was omitted that I should be aware of.
   >
   > We could certainly replicate the example without ValueState
  by using BagState and clearing it before each write, but it
  would be nice if we could draw a direct parallel.
   >
   > Brian
   >
   > On Fri, Apr 12, 2019 at 7:05 AM Maximilian Michels
  mailto:m...@apache.org>> wrote:
   >>
   >> > It would probably be pretty easy to add the corresponding
  code snippets to the docs a

Re: [PROPOSAL] Prepare for LTS bugfix release 2.7.1

2019-04-26 Thread Maximilian Michels
Sounds good Kenn. Thanks for starting the preparation for the LTS patch 
release.


I assembled a list of Flink issues a while ago which I will have to 
revisit because there have been more critical fixes since.


Thanks,
Max

On 26.04.19 08:02, Kenneth Knowles wrote:
Since it is all trivially reversible if there is some other feeling 
about this thread, I have gone ahead and started the work:


  - I made release-2.7.1 branch point to the same commit as 
release-2.7.0 so there is something to target PRs
  - I have opened the first PR, cherry-picking the set_version script 
and using it to set the version on the branch to 2.7.1: 
https://github.com/apache/beam/pull/8407 (found bug in the new script 
right away :-)


Here is the release with list of issues: 
https://issues.apache.org/jira/projects/BEAM/versions/12344458. So 
anyone can grab a ticket and volunteer to open a backport PR to the 
release-2.7.1 branch.


I don't have a strong opinion about how long we should support the 2.7.x 
line. I am curious about different perspectives on user / vendor needs. 
I have two very basic thoughts: (1) we surely need to keep it going 
until some time after we have another LTS designated, to make sure there 
is a clear path for anyone only using LTS releases and (2) if we decide 
to end support of 2.7.x but then someone volunteers to backport and 
release, of course I would not expect anyone to block them, so it has no 
maximum lifetime, but we just need consensus on a minimum. And of course 
that consensus cannot force anyone to do the work, but is just a 
resolution of the community.


Kenn

On Thu, Apr 25, 2019 at 10:29 PM Jean-Baptiste Onofré > wrote:


+1 it sounds good to me.

Thanks !

Regards
JB

On 26/04/2019 02:42, Kenneth Knowles wrote:
 > Hi all,
 >
 > Since the release of 2.7.0 we have identified some serious bugs:
 >
 >  - There are 8 (non-dupe) issues* tagged with Fix Version 2.7.1
 >  - 2 are rated "Blocker" (aka P0) but I think the others may be
underrated
 >  - If you know of a critical bug that is not on that list, please
file
 > an LTS backport ticket for it
 >
 > If a user is on an old version and wants to move to the LTS,
there are
 > some real blockers. I propose that we perform a 2.7.1 release
starting now.
 >
 > I volunteer to manage the release. What do you think?
 >
 > Kenn
 >
 > *Some are "resolved" but this is not accurate as the LTS 2.7.1
branch is
 > not created yet. I suggest filing a ticket to track just the LTS
 > backport when you hit a bug that merits it.
 >



Re: [PROPOSAL] Preparing for Beam 2.13.0 release

2019-04-26 Thread Maximilian Michels

Hi Ankur,

Sounds good. This will ensure that we stay on track regarding the 
release cycle.


Thanks,
Max

On 26.04.19 02:59, Ankur Goenka wrote:

Correction, The planned cut date is May 8th.

On Thu, Apr 25, 2019 at 4:24 PM Ankur Goenka > wrote:


Hello Beam community!

Beam 2.13 release branch cut date is April 8th according to the
release calendar [1]. I would like to volunteer myself to do this
release. I intend to cut the branch as planned on April 8th and
cherrypick fixes if needed.

If you have releasing blocking issues for 2.13 please mark their
"Fix Version" as 2.13.0. Please use 2.14.0 release in JIRA in case
you would like to move any non-blocking issues to that version.

Does this sound reasonable?

Thanks,
Ankur

[1]

https://calendar.google.com/calendar/embed?src=0p73sl034k80oob7seouanigd0%40group.calendar.google.com&ctz=America%2FLos_Angeles



Re: Hello from Hannah Jiang

2019-04-26 Thread Maximilian Michels

Awesome. Welcome Hannah!

Cheers,
Max

On 26.04.19 05:08, Yifan Zou wrote:

Welcome!

On Thu, Apr 25, 2019 at 7:34 PM Connell O'Callaghan > wrote:


Welcome Hannah!!!

On Thu, Apr 25, 2019, 5:42 PM Reza Rokni mailto:r...@google.com>> wrote:

Welcome!

On Fri, 26 Apr 2019 at 04:36, Hannah Jiang
mailto:hannahji...@google.com>> wrote:

Thanks Cyrus!

On Thu, Apr 25, 2019 at 1:34 PM Cyrus Maden
mailto:cma...@google.com>> wrote:

Welcome!!

On Thu, Apr 25, 2019 at 4:30 PM Hannah Jiang
mailto:hannahji...@google.com>>
wrote:

Thank you Robin!

On Thu, Apr 25, 2019 at 1:27 PM Robin Qiu
mailto:robi...@google.com>> wrote:

Welcome Hannah!

On Thu, Apr 25, 2019 at 1:26 PM Hannah Jiang
mailto:hannahji...@google.com>> wrote:

Thanks Kenneth!

On Thu, Apr 25, 2019 at 1:24 PM Kenneth
Knowles mailto:k...@google.com>> wrote:

Welcome!

On Thu, Apr 25, 2019 at 12:38 PM
Matthias Baetens
mailto:baetensmatth...@gmail.com>> wrote:

Welcome to the community!

On Thu, Apr 25, 2019, 18:55 Griselda
Cuevas mailto:g...@google.com>> wrote:

Welcome Hannah! - Very excited
to see you in the Beam community :)

On Tue, 23 Apr 2019 at 12:59,
Hannah Jiang
mailto:hannahji...@google.com>>
wrote:

Hi everyone

I joined Google recently and
would work on Python
portability part. I am happy
to be part of the community.
Looking forward to working
with all of you together.

I have a minor request, can
admin please give me access
to JIRA?

Thanks,
Hannah




-- 


This email may be confidential and privileged. If you received
this communication by mistake, please don't forward it to anyone
else, please erase all copies and attachments, and please let me
know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are
provided solely as a basis for further discussion, and are not
intended to be and do not constitute a legally binding
obligation. No legally binding obligations will be created,
implied, or inferred until an agreement in final form is
executed in writing by all parties involved.



Re: Removing Java Reference Runner code

2019-04-26 Thread Maximilian Michels
Thanks for following up with this. I have mixed feelings to see the 
portable Java DirectRunner go, but I'm in favor of this change because 
it removes a lot of code that we do not really make use of.


-Max

On 26.04.19 02:58, Kenneth Knowles wrote:
Thanks for providing all this background on the PR. It is very easy to 
see where it came from. Definitely nice to have less code and fewer 
things that can break. Perhaps lazy consensus is enough.


Kenn

On Thu, Apr 25, 2019 at 4:01 PM Daniel Oliveira > wrote:


Hey everyone,

I made a preliminary PR for removing all the Java Reference Runner
code (PR-8380 ) since I
wanted to see if it could be done easily. It seems to be working
fine, so I wanted to open up this discussion to make sure people are
still in agreement on getting rid of this code and that people don't
have any concerns.

For those who need additional context about this, this previous
thread


is where we discussed deprecating the Java Reference Runner (in some
places it's called the ULR or Universal Local Runner, but it's the
same thing). Then there's this thread


where we discussed removing the code from the repo since it's been
deprecated.

If no one has any objections to trying to remove the code I'll have
someone review the PR I wrote and start a vote to have it merged.

Thanks,
Daniel Oliveira



Re: :beam-sdks-java-io-hadoop-input-format:test is extremely flaky

2019-04-29 Thread Maximilian Michels

I don't know what going on with it but I agree it's annoying.

Came across https://jira.apache.org/jira/browse/BEAM-6247, maybe it is 
time to remove this module for the next release?


-Max

On 26.04.19 20:10, Reuven Lax wrote:
I find I usually have to rerun Presubmit multiple times to get a green 
run, and this test is one of the biggest culprits (though it's not the 
only culprit). Does anyone know what's going on with it?


Reuven


Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-04-29 Thread Maximilian Michels

Hi Jozef,

Yes there is potential for overhead with running Beam pipelines on 
different Runners. The Beam model has an execution framework which each 
Runner utilizes in a slightly different way.


Timers in Flink, for example, are uniquely identified by a namespace and 
a timestamp. In Beam, they are only identified by a namespace and any 
pending timers with the same namespace will get overwritten in case a 
new timer with the same namespace is set. To implement this on top of 
Flink, we have to maintain a table of timers by namespace; though it 
seems this did not cause a slowdown in your case.


I think it would be very helpful to compile a list of issues that could 
slow down pipelines. How about filing JIRA issues for what you 
discovered during profiling? We could use a "performance" tag for 
discoverability. I'd be eager to investigate some of those.


Thanks,
Max

PS: We have performance regression tests: 
https://apache-beam-testing.appspot.com/explore?dashboard=5699257587728384


On 29.04.19 12:47, Jozef Vilcek wrote:

Hello,

I am interested in any knowledge or thoughts on what should be / is an 
overhead of running Beam pipelines instead of pipelines written on "bare 
runner". Is this something which is being tested or investigated by 
community? Is there a consensus in what bounds should the overhead 
typically be? I realise this is very runner specific, but certain things 
are imposed also by SDK model itself.


I tested simple streaming pipeline on Flink vs Beam-Flink and found very 
noticeable differences. I want to stress out, it was not a performance 
test. Job does following:


Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> 
Reshuffle -> Report counter.inc() to metrics for throughput


Both jobs had same configuration and same state backed with same 
checkpointing strategy. What I noticed from few simple test runs:


* first run on Flink 1.5.0 from CPU profiles on one worker I have found 
out that ~50% time was spend either on removing timers 
from HeapInternalTimerService or in java.io.ByteArrayOutputStream from 
CoderUtils.clone()


* problem with timer delete was addressed by FLINK-9423. I have retested 
on Flink 1.7.2 and there was not much time is spend in timer delete now, 
but root cause was not removed. It still remains that timers are 
frequently registered and removed ( I believe 
from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is 
called per processed element? )  which is noticeable in GC activity, 
Heap and State ...


* in Flink I use FileSystem state backed which keeps state in memory 
CopyOnWriteStateTable which after some time is full of PaneInfo objects. 
Maybe they come from PaneInfoTracker activity


* Coder clone is painfull. Pure Flink job does copy between operators 
too, in my case it is via Kryo.copy() but this is not noticeable in CPU 
profile. Kryo.copy() does copy on object level not boject -> bytes -> 
object which is cheaper


Overall, my observation is that pure Flink can be roughly 3x faster.

I do not know what I am trying to achieve here :) Probably just start a 
discussion and collect thoughts and other experiences on the cost of 
running some data processing on Beam and particular runner.


Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-02 Thread Maximilian Michels

Thanks for the JIRA issues Jozef!


So the feature in Flink is operator chaining and Flink per default initiate 
copy of input elements. In case of Beam coders copy seems to be more noticable 
than native Flink.


Copying between chained operators can be turned off in the 
FlinkPipelineOptions (if you know what you're doing). Beam coders should 
not be slower than Flink's. They are simple wrapped. It seems Kryo is 
simply slower which we could fix by providing more type hints to Flink.


-Max

On 02.05.19 13:15, Robert Bradshaw wrote:

Thanks for filing those.

As for how not doing a copy is "safe," it's not really. Beam simply
asserts that you MUST NOT mutate your inputs (and direct runners,
which are used during testing, do perform extra copies and checks to
catch violations of this requirement).

On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek  wrote:


I have created
https://issues.apache.org/jira/browse/BEAM-7204
https://issues.apache.org/jira/browse/BEAM-7206

to track these topics further

On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek  wrote:




On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles  wrote:




On Tue, Apr 30, 2019, 07:05 Reuven Lax  wrote:


In that case, Robert's point is quite valid. The old Flink runner I believe had 
no knowledge of fusion, which was known to make it extremely slow. A lot of 
work went into making the portable runner fusion aware, so we don't need to 
round trip through coders on every ParDo.



The old Flink runner got fusion for free, since Flink does it. The new fusion 
in portability is because fusing the runner side of portability steps does not 
achieve real fusion



Aha, I see. So the feature in Flink is operator chaining and Flink per default 
initiate copy of input elements. In case of Beam coders copy seems to be more 
noticable than native Flink.
So do I get it right that in portable runner scenario, you do similar chaining via this 
"fusion of stages"? Curious here... how is it different from chaining so runner can be 
sure that not doing copy is "safe" with respect to user defined functions and their 
behaviour over inputs?




Reuven

On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek  wrote:


It was not a portable Flink runner.

Thanks all for the thoughts, I will create JIRAs, as suggested, with my 
findings and send them out

On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax  wrote:


Jozef did you use the portable Flink runner or the old one?

Reuven

On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw  wrote:


Thanks for starting this investigation. As mentioned, most of the work
to date has been on feature parity, not performance parity, but we're
at the point that the latter should be tackled as well. Even if there
is a slight overhead (and there's talk about integrating more deeply
with the Flume DAG that could elide even that) I'd expect it should be
nowhere near the 3x that you're seeing. Aside from the timer issue,
sounds like the cloning via coders is is a huge drag that needs to be
addressed. I wonder if this is one of those cases where using the
portability framework could be a performance win (specifically, no
cloning would happen between operators of fused stages, and the
cloning between operators could be on the raw bytes[] (if needed at
all, because we know they wouldn't be mutated).

On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles  wrote:


Specifically, a lot of shared code assumes that repeatedly setting a timer is 
nearly free / the same cost as determining whether or not to set the timer. 
ReduceFnRunner has been refactored in a way so it would be very easy to set the 
GC timer once per window that occurs in a bundle, but there's probably some 
underlying inefficiency around why this isn't cheap that would be a bigger win.

Kenn

On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax  wrote:


I think the short answer is that folks working on the BeamFlink runner have 
mostly been focused on getting everything working, and so have not dug into 
this performance too deeply. I suspect that there is low-hanging fruit to 
optimize as a result.

You're right that ReduceFnRunner schedules a timer for each element. I think 
this code dates back to before Beam; on Dataflow timers are identified by tag, 
so this simply overwrites the existing timer which is very cheap in Dataflow. 
If it is not cheap on Flink, this might be something to optimize.

Reuven

On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek  wrote:


Hello,

I am interested in any knowledge or thoughts on what should be / is an overhead of 
running Beam pipelines instead of pipelines written on "bare runner". Is this 
something which is being tested or investigated by community? Is there a consensus in 
what bounds should the overhead typically be? I realise this is very runner specific, but 
certain things are imposed also by SDK model itself.

I tested simple streaming pipeline on Flink vs Beam-Flink and found very 
noticeable differences. I want to stress out, it was not a performance test. 
Job does foll

Re: Fwd: Your application for Season of Docs 2019 was unsuccessful

2019-05-02 Thread Maximilian Michels
Aw, too bad. Next time. I hope we can extend the docs for portability 
before next year :)


On 02.05.19 00:30, Pablo Estrada wrote:

Hello all,
as you may already know, unfortunately our application for Season of 
Docs was not successful. That's too bad : ) - but it's good that we were 
able to produce a couple work items that can still be picked up by the 
community at some point. Thanks to everyone who helped here.

Best
-P.

-- Forwarded message -
From: *Andrew Chen* mailto:cheno...@google.com>>
Date: Tue, Apr 30, 2019 at 5:31 AM
Subject: Your application for Season of Docs 2019 was unsuccessful
To: >



Thank you for your interest and enthusiasm for Season of Docs.


We’re sorry to say that your organization’s application for Season of 
Docs was not successful this year. Because 2019 is the program’s pilot 
year, we were only able to accept 50 organizations out of the almost 200 
applications submitted. There were many high quality applications, so 
the selection of organizations was not easy.



Please do stay in touch with the progress of Season of Docs, and let us 
know if you have any questions or feedback by emailing 
season-of-docs-supp...@googlegroups.com 
. We are hoping to grow 
the program's capacity in subsequent years, so please apply again next year.



Sincerely,


The Season of Docs team




Re: [DISCUSS] (Forked thread) Beam issue triage & assignees

2019-05-02 Thread Maximilian Michels
wners
4) Automatically unassign stale issues that
are just sitting on an assignee
5) Look into SLOs per issue priority and see
how we can surface SLO violations (reports
and pings)

Kenn

On Thu, Jan 10, 2019 at 11:41 AM Scott
Wegner mailto:sweg...@google.com>> wrote:

+1

 > 3) Ensure that each component's
unresolved issues get looked at regularly

This is ideal, but I also don't know how
to get to this state. Starting with
clear component ownership and
expectations will help. If the triaging
process is well-defined, then members of
the community can help for any
components which need additional support.

On Thu, Jan 10, 2019 at 12:21 AM Mikhail
Gryzykhin mailto:gryzykhin.mikh...@gmail.com>> wrote:

+1 to keep issues unassigned and
reevaluate backlog from time to time.

We can also auto-unassign if there
was no activity on ticket for N
days. Or we can have auto-mailed
report that highlights stale
assigned issues.

On Thu, Jan 10, 2019 at 12:10 AM
Robert Bradshaw mailto:rober...@google.com>> wrote:

On Thu, Jan 10, 2019 at 3:20 AM
Ahmet Altay mailto:al...@google.com>> wrote:
 >
 > I agree with the proposals
here. Initial state of "Needs
Review" and blocking releases on
untriaged issues will ensure
that we will at least look at
every new issue once.

+1.

I'm more ambivalent about
closing stale issues. Unlike
PRs, issues can
be filed as "we should (not
    forget to) do this" much sooner than
they're actively worked on.

 > On Wed, Jan 9, 2019 at 10:30
AM Maximilian Michels
mailto:m...@apache.org>> wrote:
 >>
 >> Hi Kenn,
 >>
 >> As your data shows,
default-assigning issues to a
single person does not
 >> automatically solve triaging
issues. Quite the contrary, it
hides the triage
 >> status of an issue.
 >>
 >>  From the perspective of the
Flink Runner, we used to
auto-assign but we got rid
 >> of this. Instead, we monitor
the newly coming issues and take
actions. We also
 >> go through the old ones
occasionally. I believe that
works fine for us.
 >>
 >> The Flink project itself
also does not default-assign,
newly created issues are
 >> unassigned. There are
component leads ove

Re: Artifact staging in cross-language pipelines

2019-05-02 Thread Maximilian Michels

BTW what are the next steps here ? Heejong or Max, will one of you be able to 
come up with a detailed proposal around this ?


Thank you for all the additional comments and ideas. I will try to 
capture them in a document and share it here. Of course we can continue 
the discussion in the meantime.


-Max

On 30.04.19 19:02, Chamikara Jayalath wrote:


On Fri, Apr 26, 2019 at 4:14 PM Lukasz Cwik <mailto:lc...@google.com>> wrote:


We should stick with URN + payload + artifact metadata[1] where the
only mandatory one that all SDKs and expansion services understand
is the "bytes" artifact type. This allows us to add optional URNs
for file://, http://, Maven, PyPi, ... in the future. I would make
the artifact staging service use the same URN + payload mechanism to
get compatibility of artifacts across the different services and
also have the artifact staging service be able to be queried for the
list of artifact types it supports. Finally, we would need to have
environments enumerate the artifact types that they support.

Having everyone have the same "artifact" representation would be
beneficial since:
a) Python environments could install dependencies from a
requirements.txt file (something that the Google Cloud Dataflow
Python docker container allows for today)
b) It provides an extensible and versioned mechanism for SDKs,
environments, and artifact staging/retrieval services to support
additional artifact types
c) Allow for expressing a canonical representation of an artifact
like a Maven package so a runner could merge environments that the
runner deems compatible.

The flow I could see is:
1) (optional) query artifact staging service for supported artifact
types
2) SDK request expansion service to expand transform passing in a
list of artifact types the SDK and artifact staging service support,
the expansion service returns a list of artifact types limited to
those supported types + any supported by the environment
3) SDK converts any artifact types that the artifact staging service
or environment doesn't understand, e.g. pulls down Maven
dependencies and converts them to "bytes" artifacts
4) SDK sends artifacts to artifact staging service
5) Artifact staging service converts any artifacts to types that the
environment understands
6) Environment is started and gets artifacts from the artifact
retrieval service.


This is a very interesting proposal. I would add:
(5.5) artifact staging service resolves conflicts/duplicates for 
artifacts needed by different transforms of the same pipeline


BTW what are the next steps here ? Heejong or Max, will one of you be 
able to come up with a detailed proposal around this ?


In the meantime I suggest we add temporary pipeline options for staging 
Java dependencies from Python (and vice versa) to unblock development 
and testing of rest of the cross-language transforms stack. For example, 
https://github.com/apache/beam/pull/8340


Thanks,
Cham


On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw mailto:rober...@google.com>> wrote:

    On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels
mailto:m...@apache.org>> wrote:
 >
 > Good idea to let the client expose an artifact staging
service that the
 > ExpansionService could use to stage artifacts. This solves
two problems:
 >
 > (1) The Expansion Service not being able to access the Job Server
 > artifact staging service
 > (2) The client not having access to the dependencies returned
by the
 > Expansion Server
 >
 > The downside is that it adds an additional indirection. The
alternative
 > to let the client handle staging the artifacts returned by
the Expansion
 > Server is more transparent and easier to implement.

The other downside is that it may not always be possible for the
expansion service to connect to the artifact staging service (e.g.
when constructing a pipeline locally against a remote expansion
service).


Just to make sure, your saying the expansion service would return
all the artifacts (bytes, urls, ...) as part of the response since
the expansion service wouldn't be able to connect to the SDK that is
running locally either.


 > Ideally, the Expansion Service won't return any dependencies
because the
 > environment already contains the required dependencies. We
could make it
 > a requirement for the expansion to be performed inside an
environment.
 > Then we would already ensure during expansion time that the
runtime
 > dependencies are available.

Yes, it's cleanes

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-02 Thread Maximilian Michels

I am not sure what are you referring to here. What do you mean Kryo is simply 
slower ... Beam Kryo or Flink Kryo or?


Flink uses Kryo as a fallback serializer when its own type serialization 
system can't analyze the type. I'm just guessing here that this could be 
slower.


On 02.05.19 16:51, Jozef Vilcek wrote:



On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mailto:m...@apache.org>> wrote:


Thanks for the JIRA issues Jozef!

 > So the feature in Flink is operator chaining and Flink per
default initiate copy of input elements. In case of Beam coders copy
seems to be more noticable than native Flink.

Copying between chained operators can be turned off in the
FlinkPipelineOptions (if you know what you're doing).


Yes, I know that it can be instracted to reuse objects (if you are 
referring to this). I am just not sure I want to open this door in 
general :)
But it is interesting to learn, that with portability, this will be 
turned On per default. Quite important finding imho.


Beam coders should
not be slower than Flink's. They are simple wrapped. It seems Kryo is
simply slower which we could fix by providing more type hints to Flink.


I am not sure what are you referring to here. What do you mean Kryo is 
simply slower ... Beam Kryo or Flink Kryo or?


-Max

On 02.05.19 13:15, Robert Bradshaw wrote:
 > Thanks for filing those.
 >
 > As for how not doing a copy is "safe," it's not really. Beam simply
 > asserts that you MUST NOT mutate your inputs (and direct runners,
 > which are used during testing, do perform extra copies and checks to
 > catch violations of this requirement).
 >
 > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
mailto:jozo.vil...@gmail.com>> wrote:
 >>
 >> I have created
 >> https://issues.apache.org/jira/browse/BEAM-7204
 >> https://issues.apache.org/jira/browse/BEAM-7206
 >>
 >> to track these topics further
 >>
 >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
mailto:jozo.vil...@gmail.com>> wrote:
 >>>
 >>>
 >>>
 >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
mailto:k...@apache.org>> wrote:
 >>>>
 >>>>
 >>>>
 >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax mailto:re...@google.com>> wrote:
 >>>>>
 >>>>> In that case, Robert's point is quite valid. The old Flink
runner I believe had no knowledge of fusion, which was known to make
it extremely slow. A lot of work went into making the portable
runner fusion aware, so we don't need to round trip through coders
on every ParDo.
 >>>>
 >>>>
 >>>> The old Flink runner got fusion for free, since Flink does it.
The new fusion in portability is because fusing the runner side of
portability steps does not achieve real fusion
 >>>
 >>>
 >>> Aha, I see. So the feature in Flink is operator chaining and
Flink per default initiate copy of input elements. In case of Beam
coders copy seems to be more noticable than native Flink.
 >>> So do I get it right that in portable runner scenario, you do
similar chaining via this "fusion of stages"? Curious here... how is
it different from chaining so runner can be sure that not doing copy
is "safe" with respect to user defined functions and their behaviour
over inputs?
 >>>
 >>>>>
 >>>>>
 >>>>> Reuven
 >>>>>
 >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
mailto:jozo.vil...@gmail.com>> wrote:
 >>>>>>
 >>>>>> It was not a portable Flink runner.
 >>>>>>
 >>>>>> Thanks all for the thoughts, I will create JIRAs, as
suggested, with my findings and send them out
 >>>>>>
 >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
mailto:re...@google.com>> wrote:
 >>>>>>>
 >>>>>>> Jozef did you use the portable Flink runner or the old one?
 >>>>>>>
 >>>>>>> Reuven
 >>>>>>>
 >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
mailto:rober...@google.com>> wrote:
 >>>>>>>>
 >>>>>>>> Thanks for starting this investigation. As mentioned, most
of the work
 >>>>>>>> to date has been on feature parity, not performance
parit

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-03 Thread Maximilian Michels
Misread your post. You're saying that Kryo is more efficient that a 
roundtrip obj->bytes->obj_copy. Still, most types use Flink's 
serializers which also do the above roundtrip. So I'm not sure this 
performance advantage holds true for other Flink jobs.


On 02.05.19 20:01, Maximilian Michels wrote:
I am not sure what are you referring to here. What do you mean Kryo is 
simply slower ... Beam Kryo or Flink Kryo or?


Flink uses Kryo as a fallback serializer when its own type serialization 
system can't analyze the type. I'm just guessing here that this could be 
slower.


On 02.05.19 16:51, Jozef Vilcek wrote:



On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <mailto:m...@apache.org>> wrote:


    Thanks for the JIRA issues Jozef!

 > So the feature in Flink is operator chaining and Flink per
    default initiate copy of input elements. In case of Beam coders copy
    seems to be more noticable than native Flink.

    Copying between chained operators can be turned off in the
    FlinkPipelineOptions (if you know what you're doing).


Yes, I know that it can be instracted to reuse objects (if you are 
referring to this). I am just not sure I want to open this door in 
general :)
But it is interesting to learn, that with portability, this will be 
turned On per default. Quite important finding imho.


    Beam coders should
    not be slower than Flink's. They are simple wrapped. It seems Kryo is
    simply slower which we could fix by providing more type hints to 
Flink.



I am not sure what are you referring to here. What do you mean Kryo is 
simply slower ... Beam Kryo or Flink Kryo or?


    -Max

    On 02.05.19 13:15, Robert Bradshaw wrote:
 > Thanks for filing those.
 >
 > As for how not doing a copy is "safe," it's not really. Beam 
simply

 > asserts that you MUST NOT mutate your inputs (and direct runners,
 > which are used during testing, do perform extra copies and 
checks to

 > catch violations of this requirement).
 >
 > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
    mailto:jozo.vil...@gmail.com>> wrote:
 >>
 >> I have created
 >> https://issues.apache.org/jira/browse/BEAM-7204
 >> https://issues.apache.org/jira/browse/BEAM-7206
 >>
 >> to track these topics further
 >>
 >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
    mailto:jozo.vil...@gmail.com>> wrote:
 >>>
 >>>
 >>>
 >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
    mailto:k...@apache.org>> wrote:
 >>>>
 >>>>
 >>>>
 >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax mailto:re...@google.com>> wrote:
 >>>>>
 >>>>> In that case, Robert's point is quite valid. The old Flink
    runner I believe had no knowledge of fusion, which was known to make
    it extremely slow. A lot of work went into making the portable
    runner fusion aware, so we don't need to round trip through coders
    on every ParDo.
 >>>>
 >>>>
 >>>> The old Flink runner got fusion for free, since Flink does it.
    The new fusion in portability is because fusing the runner side of
    portability steps does not achieve real fusion
 >>>
 >>>
 >>> Aha, I see. So the feature in Flink is operator chaining and
    Flink per default initiate copy of input elements. In case of Beam
    coders copy seems to be more noticable than native Flink.
 >>> So do I get it right that in portable runner scenario, you do
    similar chaining via this "fusion of stages"? Curious here... how is
    it different from chaining so runner can be sure that not doing copy
    is "safe" with respect to user defined functions and their behaviour
    over inputs?
 >>>
 >>>>>
 >>>>>
 >>>>> Reuven
 >>>>>
 >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
    mailto:jozo.vil...@gmail.com>> wrote:
 >>>>>>
 >>>>>> It was not a portable Flink runner.
 >>>>>>
 >>>>>> Thanks all for the thoughts, I will create JIRAs, as
    suggested, with my findings and send them out
 >>>>>>
 >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
    mailto:re...@google.com>> wrote:
 >>>>>>>
 >>>>>>> Jozef did you use the portable Flink runner or the old one?
 >>>>>>>
 >>>>>>> Reuven
 >>>>>>>
 >>>>>>> On Tue

Re: [ANNOUNCE] New committer announcement: Udi Meiri

2019-05-06 Thread Maximilian Michels

Congrats and welcome to the committers-club!

On 06.05.19 14:39, Gleb Kanterov wrote:

Congratulations!

On Mon, May 6, 2019 at 2:34 PM Valentyn Tymofieiev > wrote:


Congrats, Udi!

*From: *Thomas Weise mailto:t...@apache.org>>
*Date: *Mon, May 6, 2019 at 7:50 AM
*To: * mailto:dev@beam.apache.org>>

Congrats!


On Mon, May 6, 2019 at 2:25 AM Łukasz Gajowy mailto:lgaj...@apache.org>> wrote:

Congrats! :)

pon., 6 maj 2019 o 10:45 Alexey Romanenko
mailto:aromanenko@gmail.com>>
napisał(a):

Congratulations, Udi! Thanks for your work!


On 4 May 2019, at 01:24, Heejong Lee
mailto:heej...@google.com>> wrote:

Congratulations!

On Fri, May 3, 2019 at 3:53 PM Reza Rokni
mailto:r...@google.com>> wrote:

Congratulations !

*From: *Reuven Lax mailto:re...@google.com>>
*Date: *Sat, 4 May 2019, 06:42
*To: *dev

Thank you!

On Fri, May 3, 2019 at 3:15 PM Ankur Goenka
mailto:goe...@google.com>>
wrote:

Congratulations Udi!

On Fri, May 3, 2019 at 3:00 PM Connell
O'Callaghan mailto:conne...@google.com>> wrote:

Well done Udi!!! Congratulations and
thank you for your contributions!!!

Kenn thank you for sharing!!!

On Fri, May 3, 2019 at 2:49 PM Yifan
Zou mailto:yifan...@google.com>> wrote:

Thanks Udi and congratulations!

On Fri, May 3, 2019 at 2:47 PM
Robin Qiu mailto:robi...@google.com>> wrote:

Congratulations Udi!!!

*From: *Ruoyun Huang
mailto:ruo...@google.com>>
*Date: *Fri, May 3, 2019 at
2:39 PM
*To: * mailto:dev@beam.apache.org>>

Congratulations Udi!

On Fri, May 3, 2019 at
2:30 PM Ahmet Altay
mailto:al...@google.com>>
wrote:

Congratulations, Udi!

*From: *Kyle Weaver
mailto:kcwea...@google.com>>
*Date: *Fri, May 3,
2019 at 2:11 PM
*To: *
mailto:dev@beam.apache.org>>

Congratulations
Udi! I look
forward to sending
you all my reviews for
the next month
(just kidding :)

Kyle Weaver |
Software Engineer
| github.com/ibzib

|
kcwea...@google.com 

| +1650203

On Fri, May 3,
2019 at 1:52 PM
Charles Chen
mailto:c...@google.com>>
wrote:
>
> Thank you Udi!
>
> On Fri, May 3,
2019, 1:51 PM
Aizhamal Nurmamat
kyzy
mailto:aizha...@goog

Re: Better naming for runner specific options

2019-05-06 Thread Maximilian Michels
Good points. As already mentioned there is no namespacing between the 
different pipeline option classes. In particular, there is no separate 
namespace for system and user options which is most concerning.


I'm in favor of an optional namespace using the class name of the 
defining pipeline option class. That way we would at least be able to 
resolve duplicate option names. For example, if there were was "optionX" 
in class A and B, we could use "A#optionX" to refer to it from class A.


-Max

On 04.05.19 02:23, Reza Rokni wrote:

Great point Lukasz, worker machine could be relevant to multiple runners.

Perhaps for parameters that could have multiple runner relevance, the 
doc could be rephrased to reflect its potential multiple uses. For 
example change the help information to start with a generic reference " 
worker type on the runner" followed by runner specific behavior expected 
for RunnerA, RunnerB etc...


But I do worry that without prefix even generic options could cause 
confusion. For example if the use of --network is substantially 
different between runnerA vs runnerB then the user will only have this 
information by reading the help. It will also mean that a pipeline which 
is expected to work both on-premise on RunnerA and in the cloud on 
RunnerB could fail because the format of the options to pass to 
--network are different.


Cheers

Reza

*From: *Kenneth Knowles mailto:k...@apache.org>>
*Date: *Sat, 4 May 2019 at 03:54
*To: *dev

Even though they are in classes named for specific runners, they are
not namespaced. All PipelineOptions exist in a global namespace so
they need to be careful to be very precise.

It is a good point that even though they may be multiple uses for
"machine type" they are probably not going to both happen at the
same time.

If it becomes an issue, another thing we could do would be to add
namespacing support so options have less spooky action, or at least
have a way to resolve it when it happens on accident.

Kenn

On Fri, May 3, 2019 at 10:43 AM Chamikara Jayalath
mailto:chamik...@google.com>> wrote:

Also, we do have runner specific options classes where truly
runner specific options can go.


https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java

https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java

On Fri, May 3, 2019 at 9:50 AM Ahmet Altay mailto:al...@google.com>> wrote:

I agree, that is a good point.

*From: *Lukasz Cwik mailto:lc...@google.com>>
*Date: *Fri, May 3, 2019 at 9:37 AM
*To: *dev

The concept of a machine type isn't necessarily limited
to Dataflow. If it made sense for a runner, they could
use AWS/Azure machine types as well.

On Fri, May 3, 2019 at 9:32 AM Ahmet Altay
mailto:al...@google.com>> wrote:

This idea was discussed in a PR a few months ago,
and JIRA was filed as a follow up [1]. IMO, it makes
sense to use a namespace prefix. The primary issue
here is that, such a change will very likely be a
backward incompatible change and would be hard to do
before the next major version.

[1] https://issues.apache.org/jira/browse/BEAM-6531

*From: *Reza Rokni mailto:r...@google.com>>
*Date: *Thu, May 2, 2019 at 8:00 PM
*To: * mailto:dev@beam.apache.org>>

Hi,

Was reading this SO question:


https://stackoverflow.com/questions/53833171/googlecloudoptions-doesnt-have-all-options-that-pipeline-options-has

And noticed that in


https://beam.apache.org/releases/pydoc/2.12.0/_modules/apache_beam/options/pipeline_options.html#WorkerOptions

The option is called --worker_machine_type.

I wonder if runner specific options should have
the runner in the prefix? Something like
--dataflow_worker_machine_type?

Cheers
Reza

-- 


This email may be confidential and privileged.
If you received this communication by mistake,
please don't forward it to anyone else, please
erase all copies and attachments, and please let
me know that it has gone to the wrong person.

The above terms reflect a potential business
arrangement, ar

Re: [Discuss] Publishing pre-release artifacts to repositories

2019-05-06 Thread Maximilian Michels
42332c1e180f57d60285bebe614ffa77bb53c4f74c4cbc049096@%3Cdev.airflow.apache.org%3E
 >> >>> >> >>
 >> >>> >> >> On Fri, Apr 26, 2019 at 3:38 PM Ahmet Altay
mailto:al...@google.com>> wrote:
 >> >>> >> >>>
 >> >>> >> >>> The incremental value of publishing python artifacts
to a separate place but not to actual pypi listing will be low.
Users can already download RC artifacts, or even pip install from
http location directly. I think the incremental value will be low,
because for a user or a downstream library to test with Beam RCs
using their usual ways will still require them to get other
dependencies from the regular pypi listing. That would mean they
need to change their setup to test with beam rcs, which is the same
state as today. There will be some incremental value of putting them
in more obvious places (e.g. pypi test repository). I would rather
not complicate the release process for doing this.
 >> >>> >> >>>
 >> >>> >> >>>
 >> >>> >> >>>
 >> >>> >> >>> On Thu, Apr 25, 2019 at 2:25 PM Kenneth Knowles
mailto:k...@apache.org>> wrote:
 >> >>> >> >>>>
 >> >>> >> >>>> Pip is also able to be pointed at any raw hosted
directory for the install, right? So we could publish RCs or
snapshots somewhere with more obvious caveats and not interfere with
the pypi list of actual releases. Much like the Java snapshots are
stored in a separate opt-in repository.
 >> >>> >> >>>>
 >> >>> >> >>>> Kenn
 >> >>> >> >>>>
 >> >>> >> >>>> On Thu, Apr 25, 2019 at 5:39 AM Maximilian Michels
mailto:m...@apache.org>> wrote:
 >> >>> >> >>>>>
 >> >>> >> >>>>> > wouldn't that be in conflict with Apache release
policy [1] ?
 >> >>> >> >>>>> > [1] http://www.apache.org/legal/release-policy.html
 >> >>> >> >>>>>
 >> >>> >> >>>>> Indeed, advertising pre-release artifacts is
against ASF rules. For
 >> >>> >> >>>>> example, Flink was asked to remove a link to the
Maven snapshot
 >> >>> >> >>>>> repository from their download page.
 >> >>> >> >>>>>
 >> >>> >> >>>>> However, that does not mean we cannot publish
Python artifacts. We just
 >> >>> >> >>>>> have to clearly mark them for developers only and
not advertise them
 >> >>> >> >>>>> alongside with the official releases.
 >> >>> >> >>>>>
 >> >>> >> >>>>> -Max
 >> >>> >> >>>>>
 >> >>> >> >>>>> On 25.04.19 10:23, Robert Bradshaw wrote:
 >> >>> >> >>>>> > Don't we push java artifacts to maven
repositories as part of the RC
 >> >>> >> >>>>> > process? And completely unvetted snapshots? (Or
is this OK because
 >> >>> >> >>>>> > they are special opt-in apache-only ones?)
 >> >>> >> >>>>> >
 >> >>> >> >>>>> > I am generally in favor of the idea, but would
like to avoid increased
 >> >>> >> >>>>> > toil on the release manager.
 >> >>> >> >>>>> >
 >> >>> >> >>>>> > One potential hitch I see is that current release
process updates the
 >> >>> >> >>>>> > versions to x.y.z (no RC or other pre-release
indicator in the version
 >> >>> >> >>>>> > number) whereas pypi (and other systems)
typically expect distinct
 >> >>> >> >>>>> > (recognizable) version numbers for each attempt,
and only the actual
 >> >>> >> >>>>> > final result has the actual final release version.
 >> >>> >> >>>>> >
 

Re: Artifact staging in cross-language pipelines

2019-05-07 Thread Maximilian Michels
. Thinking about this some more, general artifact conversion is 
unlikely to be practical because how to interpret an artifact is environment 
dependent. For example, a requirements.txt used to install pip packages for a 
Python docker container depends on the filesystem layout of that specific 
docker container. One could simulate doing a pip install on the same 
filesystem, see the diff and then of all the packages in requirements.txt but 
this quickly becomes impractical.




4) SDK sends artifacts to artifact staging service
5) Artifact staging service converts any artifacts to types that the 
environment understands
6) Environment is started and gets artifacts from the artifact retrieval 
service.

On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw  wrote:


On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels  wrote:


Good idea to let the client expose an artifact staging service that the
ExpansionService could use to stage artifacts. This solves two problems:

(1) The Expansion Service not being able to access the Job Server
artifact staging service
(2) The client not having access to the dependencies returned by the
Expansion Server

The downside is that it adds an additional indirection. The alternative
to let the client handle staging the artifacts returned by the Expansion
Server is more transparent and easier to implement.


The other downside is that it may not always be possible for the
expansion service to connect to the artifact staging service (e.g.
when constructing a pipeline locally against a remote expansion
service).


Just to make sure, your saying the expansion service would return all the 
artifacts (bytes, urls, ...) as part of the response since the expansion 
service wouldn't be able to connect to the SDK that is running locally either.


Yes. Well, more I'm asking how the expansion service would return any
artifacts.

What we have is

Runner <--- SDK ---> Expansion service.

Where the unidirectional arrow means "instantiates a connection with"
and the other direction (and missing arrows) may not be possible.



I believe the ExpansionService Expand request should become a unidirectional 
stream back to the caller so that artifacts could be sent back to the SDK 
(effectively mirroring the artifact staging service API). So the expansion 
response would stream back a bunch artifact data messages and also the 
expansion response containing PTransform information.


+1.


Ideally, the Expansion Service won't return any dependencies because the
environment already contains the required dependencies. We could make it
a requirement for the expansion to be performed inside an environment.
Then we would already ensure during expansion time that the runtime
dependencies are available.


Yes, it's cleanest if the expansion service provides an environment
without all the dependencies provided. Interesting idea to make this a
property of the expansion service itself.


I had thought this too but an opaque docker container that was built on top of 
a base Beam docker container would be very difficult for a runner to introspect 
and check to see if its compatible to allow for fusion across PTransforms. I 
think artifacts need to be communicated in their canonical representation.


It's clean (from the specification point of view), but doesn't allow
for good introspection/fusion (aside from one being a base of another,
perhaps).


In this case, the runner would (as
requested by its configuration) be free to merge environments it
deemed compatible, including swapping out beam-java-X for
beam-java-embedded if it considers itself compatible with the
dependency list.


Could you explain how that would work in practice?


Say one has a pipeline with environments

A: beam-java-sdk-2.12-docker
B: beam-java-sdk-2.12-docker + dep1
C: beam-java-sdk-2.12-docker + dep2
D: beam-java-sdk-2.12-docker + dep3

A runner could (conceivably) be intelligent enough to know that dep1
and dep2 are indeed compatible, and run A, B, and C in a single
beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
corresponding fusion and lower overhead benefits). If a certain
pipeline option is set, it might further note that dep1 and dep2 are
compatible with its own workers, which are build against sdk-2.12, and
choose to run these in embedded + dep1 + dep2 environment.


We have been talking about the expansion service and cross language transforms a lot 
lately but I believe it will initially come at the cost of poor fusion of transforms 
since "merging" environments that are compatible is a difficult problem since 
it brings up many of the dependency management issues (e.g. diamond dependency issues).


I agree. I think expansion services offering "kitchen-sink"
containers, when possible, can go far here. If we could at least
recognize when one environment/set of deps is a superset of another,
that could be an easy case that would yield a lot of benefit as well.



+1


Re: Kotlin iterator error

2019-05-08 Thread Maximilian Michels

Hi Ankur,

I've left a comment. Looking at the stack trace [1], this looks like a 
problem with our Reflection analysis.


-Max

[1] 
https://gist.github.com/marcoslin/e1e19afdbacac9757f6974592cfd8d7f#file-stack-trace-txt


On 04.05.19 00:56, Ankur Goenka wrote:

Hi,

A beam user on stackoverflow has posted issue while using kotlin sdk.
https://stackoverflow.com/questions/55908999/kotlin-iterable-not-supported-in-apache-beam/55911859#55911859
I am not very familiar with kotlin so can someone please take a look.

Thanks,
Ankur


Re: request for beam minor release

2019-05-08 Thread Maximilian Michels

Hi Richard,

Would it be an option to use the upcoming 2.13.0 release? The commit 
will be part of that release.


Thanks,
Max

On 08.05.19 15:43, Jean-Baptiste Onofré wrote:

Hi,

Any release are tagging. We create a branch based on a master commit.

Are you requesting 2.10.1 maintenance release ?

Regards
JB

On 08/05/2019 15:10, Moorhead,Richard wrote:

Is there a process for tagging a commit in master for a minor release?

I am trying to get this

 commit
released into 2.10.1
  


CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may constitute
inside or non-public information under international, federal, or state
securities laws. Unauthorized forwarding, printing, copying,
distribution, or use of such information is strictly prohibited and may
be unlawful. If you are not the addressee, please promptly delete this
message and notify the sender of the delivery error by e-mail or you may
call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
(816)221-1024.





Coder Evolution

2019-05-08 Thread Maximilian Michels

Hi,

I'm looking into updating the Flink Runner to Flink version 1.8. Since 
version 1.7 Flink has a new optional interface for Coder evolution*.


When a Flink pipeline is checkpointed, CoderSnapshots are written out 
alongside with the checkpointed data. When the pipeline is restored from 
that checkpoint, the CoderSnapshots are restored and used to 
reinstantiate the Coders.


Furthermore, there is a compatibility and migration check between the 
old and the new Coder. This allows to determine whether


 - The serializer did not change or is compatible (ok)
 - The serialization format of the coder changed (ok after migration)
 - The coder needs to be reconfigured and we know how to that based on
   the old version (ok after reconfiguration)
 - The coder is incompatible (error)

I was wondering about the Coder evolution story in Beam. The current 
state is that checkpointed Beam pipelines are only guaranteed to run 
with the same Beam version and pipeline version. A newer version of 
either might break the checkpoint format without any way to migrate the 
state.


Should we start thinking about supporting Coder evolution in Beam?

Thanks,
Max


* Coders are called TypeSerializers in Flink land. The interface is 
TypeSerializerSnapshot.


Re: Streaming pipelines in all SDKs!

2019-05-09 Thread Maximilian Michels

Thanks for sharing your ideas for load testing!

According to other contributors knowledge/experience: I noticed that streaming with KafkaIO is currently supported by wrapping the ExternalTransform in Python SDK. Do you think that streaming pipelines will "just work" with the current state of portability if I do the same for UnboundedSyntheticSource or is there something else missing? 


Basically yes, but it requires a bit more effort than just wrapping 
about ExternalTransform. You need to provide an ExternalTransformBuilder 
for the transform to be configured externally.


In portability UnboundedSources can only be supported via SDF. To still 
be able to use legacy IO which uses UnboundedSource the Runner has to 
supply this capability (which the Flink Runner supports). This will 
likely go away if we have an UnboundedSource SDF Wrapper :)


As for the stability of the expansion protocol, I think it's relatively 
stable and won't changed fundamentally.


Cheers,
Max

On 09.05.19 16:21, Łukasz Gajowy wrote:
My recommendation is that we make sure the protocol is stable and 
implemented on the Python SDK side before starting the Go SDK side, 
since that work is already in progress.


+1 This is exactly the roadmap that I had in mind - start with 
externalizing and using the synthetic sources in Python SDK and then 
proceed with Go. Still worth knowing what's going on there so that's why 
I asked. :)


Thanks,
Łukasz

czw., 9 maj 2019 o 16:03 Robert Burke > napisał(a):


Currently the Go SDK doesn't have cross Language support
implemented. My recommendation is that we make sure the protocol is
stable and implemented on the Python SDK side before starting the Go
SDK side, since that work is already in progress.

  The relevant state of the Go SDK:
* beam.External exists, for specifying go transforms. (See the Go
SDK's PubSubIO for an example)
* the generated go code for the protos, including the Expansions
service API was refreshed last week. Meaning the work isn't blocked
on that.

In principle, the work would be to
* ensure that the SDK side of job submission connects and looks up
relevant transforms against the Expansion service if needed, and
does the appropriate pipeline graph surgery.
   *This may be something that's handled as some kind of hook and
registration process for generally handling external transforms SDK
side.
* Have some kind of external transform to specify and configure on
the Go side.

Most of this can be ported from the Python implementation once it's
stabilized.

As with all my recommendations on how to do things with the Go SDK,
feel free to ignore it and forge ahead. I look forward to someone
tackling this, whenever it happens!

Your friendly neighborhood distributed gopher wrangler,
Robert Burke

Related:
PR 8531 [1] begins adding automates testing of the Go SDK against
Flink, which should assist with ensuring this eventual work keeps
working.

[1]: https://github.com/apache/beam/pull/8531

On Thu, May 9, 2019, 6:32 AM Łukasz Gajowy mailto:lgaj...@apache.org>> wrote:

Hi,

part of our work that needs to be done to create tests for Core
Apache Beam operations is to enable both batch and streaming
testing scenarios in all SDKs (not only Java, so lot's of
portability usage here). I gathered some thoughts on how (I
think) this could be done at the current state of Beam:

https://s.apache.org/portable-load-tests

I added some questions at the end of the doc but I will paste
them here too for visibility:

  * What is the status of Cross Language Support for Go SDK? Is
it non-trivial to enable such support (if it doesn't exist yet)?
  * According to other contributors knowledge/experience: I
noticed that streaming with KafkaIO is currently supported
by wrapping the ExternalTransform in Python SDK. Do you
think that streaming pipelines will "just work" with the
current state of portability if I do the same for
UnboundedSyntheticSource or is there something else missing? 


BTW: great to see Cross Language Support happening. Thanks for
doing this! :)

Thanks,
Łukasz






Re: request for beam minor release

2019-05-10 Thread Maximilian Michels

Assuming 2.13 will include or otherwise be supported by flink-runner-1.7 then 
this should not be an issue.


Yes, we will keep supporting Flink 1.7 for Beam 2.13.

-Max

On 08.05.19 19:54, Kenneth Knowles wrote:
For the benefit of the thread, I will also call out our incubating LTS 
(Long-Term Support) policy. For critical fixes, we will issue patch 
releases on the 2.7 branch. We are currently gathering proposals for 
cherry-picks to 2.7.1 to do that release. Other than that, releases are 
a lot of work so we focus on a steady rate of minor version releases 
instead of patching non-LTS versions.


You can read at https://beam.apache.org/community/policies/.

Kenn

On Wed, May 8, 2019 at 9:22 AM Moorhead,Richard 
mailto:richard.moorhe...@cerner.com>> wrote:


Assuming 2.13 will include or otherwise be supported by
flink-runner-1.7 then this should not be an issue.


*From:* Jean-Baptiste Onofré mailto:j...@nanthrax.net>>
*Sent:* Wednesday, May 8, 2019 10:09 AM
*To:* dev@beam.apache.org <mailto:dev@beam.apache.org>
*Subject:* Re: request for beam minor release
I second Max here. If you are just looking for this specific commit, you
can take a next release that will include it.

Regards
JB

On 08/05/2019 16:27, Maximilian Michels wrote:
> Hi Richard,
> 
> Would it be an option to use the upcoming 2.13.0 release? The commit

> will be part of that release.
> 
> Thanks,

> Max
> 
> On 08.05.19 15:43, Jean-Baptiste Onofré wrote:

>> Hi,
>>
>> Any release are tagging. We create a branch based on a master commit.
>>
>> Are you requesting 2.10.1 maintenance release ?
>>
>> Regards
>> JB
>>
>> On 08/05/2019 15:10, Moorhead,Richard wrote:
>>> Is there a process for tagging a commit in master for a minor release?
>>>
>>> I am trying to get this
>>> 
<https://github.com/apache/beam/pull/8503/commits/ffa5632bca8c7264993702c39c6ca013a9f6ecdb>
 commit
>>>
>>> released into 2.10.1
>>>  
>>> CONFIDENTIALITY NOTICE This message and any included attachments are

>>> from Cerner Corporation and are intended only for the addressee. The
>>> information contained in this message is confidential and may constitute
>>> inside or non-public information under international, federal, or state
>>> securities laws. Unauthorized forwarding, printing, copying,
>>> distribution, or use of such information is strictly prohibited and may
>>> be unlawful. If you are not the addressee, please promptly delete this
>>> message and notify the sender of the delivery error by e-mail or you may
>>> call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
>>> (816)221-1024.
>>>
>>

-- 
Jean-Baptiste Onofré

jbono...@apache.org <mailto:jbono...@apache.org>
http://blog.nanthrax.net
Talend - http://www.talend.com

CONFIDENTIALITY NOTICE This message and any included attachments are
from Cerner Corporation and are intended only for the addressee. The
information contained in this message is confidential and may
constitute inside or non-public information under international,
federal, or state securities laws. Unauthorized forwarding,
printing, copying, distribution, or use of such information is
strictly prohibited and may be unlawful. If you are not the
addressee, please promptly delete this message and notify the sender
of the delivery error by e-mail or you may call Cerner's corporate
offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.



Re: Streaming pipelines in all SDKs!

2019-05-10 Thread Maximilian Michels

So, FlinkRunner has some sort of special support for executing UnboundedSource 
via the runner in the portable world ? I see a transform override for bounded 
sources in PortableRunner [1] but nothing for unbounded sources.


It's in the translation code: 
https://github.com/apache/beam/blob/6679b00138a5b82a6a55e7bc94c453957cea501c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L216


For migration I think that's a valid path, especially because Runners 
already have the translation code in place. We can later swap-out the 
UnboundedSource translation with the SDF wrapper.


-Max

On 09.05.19 22:46, Robert Bradshaw wrote:

From: Chamikara Jayalath 
Date: Thu, May 9, 2019 at 7:49 PM
To: dev


From: Maximilian Michels 
Date: Thu, May 9, 2019 at 9:21 AM
To: 


Thanks for sharing your ideas for load testing!


According to other contributors knowledge/experience: I noticed that streaming with 
KafkaIO is currently supported by wrapping the ExternalTransform in Python SDK. Do you 
think that streaming pipelines will "just work" with the current state of 
portability if I do the same for UnboundedSyntheticSource or is there something else 
missing?


Basically yes, but it requires a bit more effort than just wrapping
about ExternalTransform. You need to provide an ExternalTransformBuilder
for the transform to be configured externally.

In portability UnboundedSources can only be supported via SDF. To still
be able to use legacy IO which uses UnboundedSource the Runner has to
supply this capability (which the Flink Runner supports). This will
likely go away if we have an UnboundedSource SDF Wrapper :)



So, FlinkRunner has some sort of special support for executing UnboundedSource 
via the runner in the portable world ? I see a transform override for bounded 
sources in PortableRunner [1] but nothing for unbounded sources.

Agree, that we cannot properly support cross-language unbounded sources till we 
have SDF and a proper unbounded source to SDF wrapper.


That is correct. Go will need SDF support as well.

As waiting on implementing the expansion service, except for the
vending of extra artifacts (which will be an extension), we discussed
this earlier and it's considered stable and ready to build on now.



Re: Coder Evolution

2019-05-10 Thread Maximilian Michels
Thanks for the references Luke! I thought that there may have been prior 
discussions, so this thread could be a good place to consolidate.



Dataflow also has an update feature, but it's limited by the fact that Beam 
does not have a good concept of Coder evolution. As a result we try very hard 
never to change import Coders,


Trying not to break Coders is a fair approach and could work fine for 
Beam itself, if the Coders were designed really carefully. But what 
about custom Coders users may have written? AvroCoder or ProtoCoder 
would be good candidates for forwards-compatibility, but even these do 
not have migration functionality built in.


Is schema evolution already part of SchemaCoder? It's definitely a good 
candidate for evolution because a schema provides the insight-view for a 
Coder, but as for how to actually perform the evolution, it looks like 
this is still an open question.


-Max

On 09.05.19 18:56, Reuven Lax wrote:
Dataflow also has an update feature, but it's limited by the fact that 
Beam does not have a good concept of Coder evolution. As a result we try 
very hard never to change import Coders, which sometime makes 
development of parts of Beam much more difficult. I think Beam would 
benefit greatly by having a first-class concept of Coder evolution.


BTW for schemas there is a natural way of defining evolution that can be 
handled by SchemaCoder.


On Wed, May 8, 2019 at 12:58 PM Lukasz Cwik <mailto:lc...@google.com>> wrote:


There was a thread about coder update in the past here[1]. Also,
Reuven sent out a doc[2] about pipeline drain and update which was
discussed in this thread[3]. I believe there have been more
references to pipeline update in other threads when people tried to
change coder encodings in the past as well.

Reuven/Dan are the best contacts about this on how this works inside
of Google, the limitations and other ideas that had been proposed.

1:

https://lists.apache.org/thread.html/f3b2daa740075cc39dc04f08d1eaacfcc2d550cecc04e27be024cf52@%3Cdev.beam.apache.org%3E
2:

https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#
3:

https://lists.apache.org/thread.html/37c9aa4aa5011801a7f060bf3f53e687e539fa6154cc9f1c544d4f7a@%3Cdev.beam.apache.org%3E

On Wed, May 8, 2019 at 11:45 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi,

I'm looking into updating the Flink Runner to Flink version 1.8.
Since
version 1.7 Flink has a new optional interface for Coder evolution*.

When a Flink pipeline is checkpointed, CoderSnapshots are
written out
alongside with the checkpointed data. When the pipeline is
restored from
that checkpoint, the CoderSnapshots are restored and used to
reinstantiate the Coders.

Furthermore, there is a compatibility and migration check
between the
old and the new Coder. This allows to determine whether

   - The serializer did not change or is compatible (ok)
   - The serialization format of the coder changed (ok after
migration)
   - The coder needs to be reconfigured and we know how to that
based on
     the old version (ok after reconfiguration)
   - The coder is incompatible (error)

I was wondering about the Coder evolution story in Beam. The
current
state is that checkpointed Beam pipelines are only guaranteed to
run
with the same Beam version and pipeline version. A newer version of
either might break the checkpoint format without any way to
migrate the
state.

Should we start thinking about supporting Coder evolution in Beam?

Thanks,
Max


* Coders are called TypeSerializers in Flink land. The interface is
TypeSerializerSnapshot.



Re: Developing a new beam runner for Twister2

2019-05-14 Thread Maximilian Michels

Hi Pulasthi,

Great to hear you're planning to implement a Twister2 Runner.

If you have limited time, you probably want to decide whether to build a 
"legacy" Java Runner or a portable one. They are not fundamentally 
different but there are some tricky implementation details for the 
portable Runner related to the asynchronous communication with the SDK 
Harness.


If you have enough time, first implementing a "legacy" Runner might be a 
good way to learn the Beam model and subsequently creating a portable 
Runner should not be hard then.


To get an idea of the differences, check out the Flink source code:
- FlinkStreamingTransformTranslators (Java "legacy")
- FlinkStreamingPortablePipelineTranslator (portable)

Feel free to ask questions here or on Slack.

Cheers,
Max

On 14.05.19 05:11, Kenneth Knowles wrote:

Welcome! This is very cool to hear about.

A major caveat about https://beam.apache.org/contribute/runner-guide/ is 
that it was written when Beam's portability framework was more of a 
sketch. The conceptual descriptions are mostly fine, but the pointers to 
Java helper code will lead you to build a "legacy" runner when it is 
better to build a portable runner from the start*.


We now have four portable runners in various levels of completeness: 
Spark, Flink, Samza, and Dataflow. I have added some relevant people to 
the CC for emphasis. You might also join 
https://the-asf.slack.com/#beam-portability though I prefer the dev list 
since it gives visibility to a much greater portion of the community.


Kenn

*volunteers welcome to update the guide to emphasize portability first

*From: *Pulasthi Supun Wickramasinghe >

*Date: *Mon, May 13, 2019 at 11:03 AM
*To: * mailto:dev@beam.apache.org>>

Hi All,

I am Pulasthi a Ph.D. student at Indiana University. We are planning
to develop a beam runner for our project Twister2 [1] [2]. Twister2
is a big data framework which supports both batch and stream
processing. If you are interested you can find more information on
[2] or read some of our publications [3]

I wanted to share our intent and get some guidance from the beam
developer community before starting on the project. I was planning
on going through the code for Apache Spark and Apache Flink runners
to get a better understanding of what I need to do. It would be
great if I can get any pointers on how I should approach this
project. I am currently reading through the runner-guide
.

Finally, I assume that I need to create a JIRA issue to track the
progress of this project, right?. I can create the issue but from
what I read from the contribute section I would need some permission
to assign it to my self, I hope someone would be able to help me
with that. Looking forward to working with the Beam community.

[1] https://github.com/DSC-SPIDAL/twister2
[2] https://twister2.gitbook.io/twister2/
[3] https://twister2.gitbook.io/twister2/publications

Best Regards,
Pulasthi
-- 
Pulasthi S. Wickramasinghe

PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035



Re: [ANNOUNCE] New PMC Member: Pablo Estrada

2019-05-15 Thread Maximilian Michels

Congrats Pablo! Thank you for your help to grow the Beam community!

On 15.05.19 10:33, Tim Robertson wrote:

Congratulations Pablo

On Wed, May 15, 2019 at 10:22 AM Ismaël Mejía > wrote:


Congrats Pablo, well deserved, nece to see your work recognized!

On Wed, May 15, 2019 at 9:59 AM Pei HE mailto:pei...@gmail.com>> wrote:
 >
 > Congrats, Pablo!
 >
 > On Tue, May 14, 2019 at 11:41 PM Tanay Tummalapalli
 > mailto:ttanay.apa...@gmail.com>> wrote:
 > >
 > > Congratulations Pablo!
 > >
 > > On Wed, May 15, 2019, 12:08 Michael Luckey mailto:adude3...@gmail.com>> wrote:
 > >>
 > >> Congrats, Pablo!
 > >>
 > >> On Wed, May 15, 2019 at 8:21 AM Connell O'Callaghan
mailto:conne...@google.com>> wrote:
 > >>>
 > >>> Awesome well done Pablo!!!
 > >>>
 > >>> Kenn thank you for sharing this great news with us!!!
 > >>>
 > >>> On Tue, May 14, 2019 at 11:01 PM Ahmet Altay
mailto:al...@google.com>> wrote:
 > 
 >  Congratulations!
 > 
 >  On Tue, May 14, 2019 at 9:11 PM Robert Burke
mailto:rob...@frantil.com>> wrote:
 > >
 > > Woohoo! Well deserved.
 > >
 > > On Tue, May 14, 2019, 8:34 PM Reuven Lax mailto:re...@google.com>> wrote:
 > >>
 > >> Congratulations!
 > >>
 > >> From: Mikhail Gryzykhin mailto:gryzykhin.mikh...@gmail.com>>
 > >> Date: Tue, May 14, 2019 at 8:32 PM
 > >> To: mailto:dev@beam.apache.org>>
 > >>
 > >>> Congratulations Pablo!
 > >>>
 > >>> On Tue, May 14, 2019, 20:25 Kenneth Knowles
mailto:k...@apache.org>> wrote:
 > 
 >  Hi all,
 > 
 >  Please join me and the rest of the Beam PMC in welcoming
Pablo Estrada to join the PMC.
 > 
 >  Pablo first picked up BEAM-722 in October of 2016 and
has been a steady part of the Beam community since then. In addition
to technical work on Beam Python & Java & runners, I would highlight
how Pablo grows Beam's community by helping users, working on GSoC,
giving talks at Beam Summits and other OSS conferences including
Flink Forward, and holding training workshops. I cannot do justice
to Pablo's contributions in a single paragraph.
 > 
 >  Thanks Pablo, for being a part of Beam.
 > 
 >  Kenn



Re: [VOTE] Remove deprecated Java Reference Runner code from repository.

2019-05-15 Thread Maximilian Michels

+1

On 15.05.19 13:19, Robert Bradshaw wrote:

+1 for removing the code given the current state of things.

On Wed, May 15, 2019 at 12:32 AM Ruoyun Huang  wrote:


+1

From: Daniel Oliveira 
Date: Tue, May 14, 2019 at 2:19 PM
To: dev


Hello everyone,

I'm calling for a vote on removing the deprecated Java Reference Runner code. 
The PR for the change has already been tested and reviewed: 
https://github.com/apache/beam/pull/8380

[ ] +1, Approve merging the removal PR in it's current state
[ ] -1, Veto the removal PR (please provide specific comments)

The vote will be open for at least 72 hours. Since this a vote on 
code-modification, it is adopted if there are at least 3 PMC affirmative votes 
and no vetoes.

For those who would like context on why the Java Reference Runner is being 
deprecated, the discussions took place in the following email threads:

(8 Feb. 2019) Thoughts on a reference runner to invest in? - Decision to 
deprecate the Java Reference Runner and use the Python FnApiRunner for those 
use cases instead.
(14 Mar. 2019) Python PVR Reference post-commit tests failing - Removal of 
Reference Runner Post-Commits from Jenkins, and discussion on removal of code.
(25 Apr. 2019) Removing Java Reference Runner code - Discussion thread before 
this formal vote.




--

Ruoyun  Huang



Re: Developing a new beam runner for Twister2

2019-05-15 Thread Maximilian Michels
+1 Portability is the way forward. If you have to choose between the 
two, go for the portable one. For educational purposes, I'd still 
suggest checking out the "legacy" Runners. Actually, a new Runner could 
implement both Runner styles with most of the code shared between the two.


-Max

On 15.05.19 11:47, Robert Bradshaw wrote:

I would strongly suggest new runners adapt the portability runner from
the start, which will be more forward compatible and more flexible
(e.g. supporting other languages). The primary difference is that
rather than wrapping individual DoFns, one wraps a "fused" bundle of
DoFns (called an ExecutableStage). As it looks liek Twister2 is
written in Java, you can take advantage of much of the existing Java
libraries that already do this that are shared among the other Java
runners.

On Tue, May 14, 2019 at 7:55 PM Pulasthi Supun Wickramasinghe
 wrote:


Hi,

Thanks Kenn and Max for the information. Will read up a little more and discuss 
with the Twister2 team before deciding on which route to take. I also created 
an issue in BEAM JIRA[1], but I cannot assign this to my self would someone be 
able to assign the issue to me. Thanks in advance.

[1] https://issues.apache.org/jira/browse/BEAM-7304

Best Regards
Pulasthi

On Tue, May 14, 2019 at 6:19 AM Maximilian Michels  wrote:


Hi Pulasthi,

Great to hear you're planning to implement a Twister2 Runner.

If you have limited time, you probably want to decide whether to build a
"legacy" Java Runner or a portable one. They are not fundamentally
different but there are some tricky implementation details for the
portable Runner related to the asynchronous communication with the SDK
Harness.

If you have enough time, first implementing a "legacy" Runner might be a
good way to learn the Beam model and subsequently creating a portable
Runner should not be hard then.

To get an idea of the differences, check out the Flink source code:
- FlinkStreamingTransformTranslators (Java "legacy")
- FlinkStreamingPortablePipelineTranslator (portable)

Feel free to ask questions here or on Slack.

Cheers,
Max

On 14.05.19 05:11, Kenneth Knowles wrote:

Welcome! This is very cool to hear about.

A major caveat about https://beam.apache.org/contribute/runner-guide/ is
that it was written when Beam's portability framework was more of a
sketch. The conceptual descriptions are mostly fine, but the pointers to
Java helper code will lead you to build a "legacy" runner when it is
better to build a portable runner from the start*.

We now have four portable runners in various levels of completeness:
Spark, Flink, Samza, and Dataflow. I have added some relevant people to
the CC for emphasis. You might also join
https://the-asf.slack.com/#beam-portability though I prefer the dev list
since it gives visibility to a much greater portion of the community.

Kenn

*volunteers welcome to update the guide to emphasize portability first

*From: *Pulasthi Supun Wickramasinghe mailto:pulasthi...@gmail.com>>
*Date: *Mon, May 13, 2019 at 11:03 AM
*To: * mailto:dev@beam.apache.org>>

 Hi All,

 I am Pulasthi a Ph.D. student at Indiana University. We are planning
 to develop a beam runner for our project Twister2 [1] [2]. Twister2
 is a big data framework which supports both batch and stream
 processing. If you are interested you can find more information on
 [2] or read some of our publications [3]

 I wanted to share our intent and get some guidance from the beam
 developer community before starting on the project. I was planning
 on going through the code for Apache Spark and Apache Flink runners
 to get a better understanding of what I need to do. It would be
 great if I can get any pointers on how I should approach this
 project. I am currently reading through the runner-guide
 <https://beam.apache.org/contribute/runner-guide/>.

 Finally, I assume that I need to create a JIRA issue to track the
 progress of this project, right?. I can create the issue but from
 what I read from the contribute section I would need some permission
 to assign it to my self, I hope someone would be able to help me
 with that. Looking forward to working with the Beam community.

 [1] https://github.com/DSC-SPIDAL/twister2
 [2] https://twister2.gitbook.io/twister2/
 [3] https://twister2.gitbook.io/twister2/publications

 Best Regards,
 Pulasthi
 --
 Pulasthi S. Wickramasinghe
 PhD Candidate  | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington
 cell: 224-386-9035





--
Pulasthi S. Wickramasinghe
PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Maximilian Michels

Hi Jan,

Thanks for the discussion. Aljoscha already gave great answers. Just a 
couple of remarks:


 a) streaming semantics (i.e. what I can express using Transforms) are subset of batch semantics 


I think you mean streaming is a superset of batch, or batch is a subset 
of streaming. This is the ideal. In practice, the two execution modes 
are sometimes accomplished by two different execution engines. Even in 
Flink, we have independent APIs for batch and streaming and the 
execution semantics are slightly different. For example, there are no 
watermarks in the batch API. Thus, batch rarely is simply an execution 
mode of streaming. However, I still think the unified Beam model works 
in both cases.



 batch semantics and streaming semantics differs only in that I can have 
GlobalWindow with default trigger on batch and cannot on stream


You can have a GlobalWindow in streaming with a default trigger. You 
could define additional triggers that do early firings. And you could 
even trigger the global window by advancing the watermark to +inf.


On batch engines, this is generally not an issue, because the buffering is eliminated by sorting - when a Group by operation occurs, batch runners sort elements with the same key to be together and therefore eliminate the need for potentially infinite cache. 


The batch engines you normally use might do that. However, I do not see 
how sorting is an inherent property of the streaming model. We do not 
guarantee a deterministic order of events in streaming with respect to 
event time. In that regard, batch is a true subset of streaming because 
we make no guarantees on the order. Actually, because we only advance 
the watermark from -inf to +inf once we have read all data, this nicely 
aligns with the streaming model.


-Max

On 16.05.19 15:20, Aljoscha Krettek wrote:

Hi,

I think it’s helpful to consider that events never truly arrive in order in the 
real world (you mentioned as much yourself). For streaming use cases, there 
might be some out-of-orderness (or a lot of it, depending on the use case) so 
your implementation has to be able to deal with that. On the other end of the 
spectrum we have batch use cases, where out-of-orderness is potentially even 
bigger because it allows for more efficient parallel execution. If your 
implementation can deal with out-of-orderness that also shouldn’t be a problem.

Another angle is completeness vs. latency: you usually cannot have both in a 
streaming world. If you want 100 % completeness, i.e. you want to ensure that 
you process all events and never drop anything, you can never advance the  
watermark from its initial -Inf if you want to also never have watermark 
violations. In typical use cases I would expect any sorting guarantees to be 
constantly violated, unless you are willing to drop late data.

I think these are some reasons why there is no mention of ordering by timestamp 
anywhere (unless I’m mistaken and there is somewhere).

You are right, of course, that batch-style runners can use grouping/sorting for 
a GroupByKey operation. Flink does that and even allows sorting by secondary 
key, so you could manually sort by timestamp as a secondary key with hardly any 
additional cost. However, exposing that in the model would make implementing 
Runners quite hard, or they would be prohibitively slow.

You’re also right that user functions that do arbitrary stateful operations can 
be quite dangerous and lead to unexpected behaviour. You example of reacting to 
changes in 0 and 1 would produce wrong results if events are not 100% sorted by 
timestamp. In general, state changes that rely on processing order are 
problematic while operations that move monotonously though some space are fine. 
Examples of such operations are adding elements to a set or summing numbers. If 
you “see” a given set of events you can apply them to state in any order and as 
long as you see the same set of events on different executions the result will 
be the same.

As for the Beam execution model in relation to processing and time, I think the 
only “guarantees” are:
  - you will eventually see all events
  - the timestamp of those events is usually not less than the watermark (but 
not always)
  - the watermark will advance when the system thinks you won’t see events with 
a smaller timestamp in the future (but you sometimes might)

Those seem quite “poor”, but I think you can’t get better guarantees for 
general cases for the reasons mentioned above. Also, this is just of the top of 
my head and I might be wrong in my understanding of the Beam model. :-O

Best,
Aljoscha


On 16. May 2019, at 13:53, Jan Lukavský  wrote:

Hi,

this is starting to be really exciting. It seems to me that there is either something 
wrong with my definition of "Unified model" or with how it is implemented 
inside (at least) Direct and Flink Runners.

So, first what I see as properties of Unified model:

  a) streaming semantics (i.e. what I can express

Re: FlinkRunner CheckPoint Failed - Couldn't materialized/TypeSerialization

2019-05-20 Thread Maximilian Michels

Hi,

Since you get AbstractMethodError, there is likely a version mismatch 
with Beam/Flink.


Could you please provide:

- Beam version used
- Flink Runner artifact used
- Flink version used

Thanks,
Max

On 20.05.19 01:16, cm...@godaddy.com wrote:

Hello Beam Dev,

I am having a hard time to get checkpoint work with FlinkRunner. I setup 
two simple pipelines, one read from unboundedsource Kinesis, and other 
read from text file (and test with/without `--streaming=true` config). 
But both are failed to save a checkpoint. The checkpoint are configured 
to save to file system. I am wonder if I am missing something?. Below 
are my pipelines and stack track for your references.


Appreciate if you can give me some pointers!

*Pipeline #1: *

pipeline.apply("ReadingFromKinesis123", KinesisIO./read/()
   .withStreamName("test-stream")
   .withInitialPositionInStream(InitialPositionInStream./TRIM_HORIZON/)
     .withAWSClientsProvider("foo", "bar", Regions./US_WEST_2/, 
"http://localhost:30002";));


*Pipeline#2:*

Pipeline p = Pipeline./create/(options);
p.apply("ReadLines", TextIO./read/().from(options.getInputFile()));
p.run().waitUntilFinish();

*One of my commands:*

bin/flink run -c org.apache.beam.examples.WordCount 
../../word-count-beam/target/word-count-beam-bundled-0.1.jar 
--runner=FlinkRunner --checkpointingInterval=5000 
--externalizedCheckpointsEnabled=true --streamName=test-stream 
--retainExternalizedCheckpointsOnCancellation=true --awsRegion=us-west-2


*flink-conf.yaml:*

state.backend: filesystem

state.checkpoints.dir: file:///tmp/flink-checkpoint/flink_app/

state.savepoints.dir: file:///tmp/flink-checkpoint/flink_app/savepoints/

*Stacktrack:*

AsynchronousException{java.lang.Exception: Could not materialize 
checkpoint 7 for operator Source: ReadLines/Read -> 
DropInputs/ParMultiDo(NoOp) (1/1).}


at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)


at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)


at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)


at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not materialize checkpoint 7 for 
operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1).


at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)


... 6 more

Caused by: java.util.concurrent.ExecutionException: 
java.lang.AbstractMethodError: 
org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;


at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)


at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)


at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)


... 5 more

Caused by: java.lang.AbstractMethodError: 
org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;


at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)


at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)


at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:123)


at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)


at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)


at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)




Re: Contributor permissions for Beam JIRA

2019-05-20 Thread Maximilian Michels

Hi Kamil,

That sounds great. Could you send me your JIRA username? I couldn't find 
your account on JIRA.


Thanks,
Max

On 20.05.19 11:27, Kamil Wasilewski wrote:

Hi,
I am Kamil Wasilewski and I would like to start making improvements to 
the Beam Python SDK. I would like to assign JIRA issues to myself. Can 
someone mark me as a contributor?


Thanks,
Kamil


Re: Contributor permissions for Beam JIRA

2019-05-20 Thread Maximilian Michels

You should have the contributor permission now.

Cheers,
Max

On 20.05.19 11:59, Kamil Wasilewski wrote:

Here's my username: kamilwu

Kamil

On Mon, May 20, 2019 at 11:47 AM Maximilian Michels <mailto:m...@apache.org>> wrote:


Hi Kamil,

That sounds great. Could you send me your JIRA username? I couldn't
find
your account on JIRA.

Thanks,
Max

On 20.05.19 11:27, Kamil Wasilewski wrote:
 > Hi,
 > I am Kamil Wasilewski and I would like to start making
improvements to
 > the Beam Python SDK. I would like to assign JIRA issues to
myself. Can
 > someone mark me as a contributor?
 >
 > Thanks,
 > Kamil



Re: [BEAM-6673] pre-commit checks failing due to test failure in unrelated Docker module

2019-05-20 Thread Maximilian Michels

I've opened a PR: https://github.com/apache/beam/pull/8625

Due to the provided scope, we have to add it as an explicit dependency 
to the container task.


Thanks,
Max

On 20.05.19 15:16, Robert Bradshaw wrote:

I created https://issues.apache.org/jira/browse/BEAM-7367

On Mon, May 20, 2019 at 3:11 PM Michael Luckey  wrote:


This is most likely caused by Merge of 
https://issues.apache.org/jira/browse/BEAM-7349, which was done lately.

Best,

michel

On Mon, May 20, 2019 at 2:49 PM Charith Ellawala  
wrote:


Hello,

I am trying to create a PR for BEAM-6673 which adds schema support for BigQuery 
reads (https://github.com/apache/beam/pull/8620). However, one of the 
pre-commit tests is failing in the (unrelated) Docker module:

Task :sdks:java:container:docker FAILED
ADD failed: stat 
/var/lib/docker/tmp/docker-builder145327764/target/kafka-clients.jar: no such 
file or directory


I have not touched this code so the test failure is not caused by any of my changes. I 
have tried re-running the test suite but the failure still persists. From a quick glance, 
it looks like the kafka-clients dependency is designated as "provided" so 
perhaps the failure is caused by the Jenkins machine not having the required jar file? I 
would really appreciate some help with resolving this issue so that I can submit my PR 
for review.

Thank you.

Best regards,
Charith


Re: [Discuss] Ideas for Apache Beam presence in social media

2019-05-21 Thread Maximilian Michels

Hi Aizhamal,

This is a great idea. I think it would help Beam to be more prominent on 
social media.


We need to discuss this also on the private@ mailing list but I don't 
see anything standing in the way if the PMC always gets to approve the 
proposed social media postings.


I could even imagine that the PMC gives rights to a Beam community 
member to post in their name.


Thanks,
Max

On 21.05.19 03:09, Austin Bennett wrote:
Is PMC definitely in charge of this (approving, communication channel, 
etc)?


There could even be a more concrete pull-request-like function even for 
things like tweets (to minimize cut/paste operations)?


I remember a bit of a mechanism having been proposed some time ago (in 
another circumstance), though doesn't look like it made it terribly far: 
http://www.redhenlab.org/home/the-cognitive-core-research-topics-in-red-hen/the-barnyard/-slick-tweeting 
(I haven't otherwise seen such functionality).




On Mon, May 20, 2019 at 4:54 PM Robert Burke > wrote:


+1
As a twitter user, I like this idea.

On Mon, 20 May 2019 at 15:18, Aizhamal Nurmamat kyzy
mailto:aizha...@google.com>> wrote:

Hello everyone,


What does the community think of making Apache Beam’s social
media presence more active and more community driven?


The Slack and StackOverflow for Apache Beam offer pretty nice
support, but we still could utilize Twitter & LinkedIn better to
share more interesting Beam news. For example, we could tweet to
welcome new committers, announce new features consistently,
share and recognize contributions, promote events and meetups,
share other news that are relevant to Beam, big data, etc.


I understand that PMC members may not have time to do curation,
moderation and creation of content; so I was wondering if we
could create a spreadsheet where community members could propose
posts with publishing dates, and let somebody to filter,
moderate, and manage it; then send to a PMC member for publication.


I would love to help where I can in this regard. I’ve had some
experience doing social media elsewhere in the past.


Best

Aizhamal




Re: Definition of Unified model

2019-05-22 Thread Maximilian Michels
f-orderedness.

Pipelines that fail in the "worst case" batch scenario are 
likely to
degrade poorly (possibly catastrophically) when the watermark 
falls

behind in streaming mode as well.

 - another option would be to introduce annotation for 
DoFns (e.g.
@RequiresStableTimeCharacteristics), which would result in the 
sorting
in batch case - but - this extension would have to ensure the 
sorting in
streaming mode also - it would require definition of allowed 
lateness,

and triggger (essentially similar to window)

This might be reasonable, implemented by default by buffering
everything and releasing elements as the watermark (+lateness)
advances, but would likely lead to inefficient (though *maybe* 
easier

to reason about) code. Not sure about the semantics of triggering
here, especially data-driven triggers. Would it be roughly 
equivalent

to GBK + FlatMap(lambda (key, values): [(key, value) for value in
values])?

Or is the underlying desire just to be able to hint to the 
runner that
the code may perform better (e.g. require less resources) as 
skew is

reduced (and hence to order by timestamp iff it's cheap)?

 - last option would be to introduce these "higher order 
guarantees" in
some extension DSL (e.g. Euphoria), but that seems to be the 
worst

option to me

I see the first two options quite equally good, although the 
letter one

is probably more time consuming to implement. But it would bring
additional feature to streaming case as well.

Thanks for any thoughts.

 Jan

On 5/20/19 12:41 PM, Robert Bradshaw wrote:
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský 
 wrote:

Hi Reuven,


How so? AFAIK stateful DoFns work just fine in batch runners.
Stateful ParDo works in batch as far, as the logic inside 
the state works for absolutely unbounded out-of-orderness of 
elements. That basically (practically) can work only for 
cases, where the order of input elements doesn't matter. 
But, "state" can refer to "state machine", and any time you 
have a state machine involved, then the ordering of elements 
would matter.
No guarantees on order are provided in *either* streaming or 
batch

mode by the model. However, it is the case that in order to make
forward progress most streaming runners attempt to limit the 
amount of
out-of-orderedness of elements (in terms of event time vs. 
processing

time) to make forward progress, which in turn could help cap the
amount of state that must be held concurrently, whereas a 
batch runner

may not allow any state to be safely discarded until the whole
timeline from infinite past to infinite future has been 
observed.


Also, as pointed out, state is not preserved "batch to batch" 
in batch mode.



On Thu, May 16, 2019 at 3:59 PM Maximilian Michels 
 wrote:


 batch semantics and streaming semantics differs only 
in that I can have GlobalWindow with default trigger on 
batch and cannot on stream
You can have a GlobalWindow in streaming with a default 
trigger. You
could define additional triggers that do early firings. And 
you could
even trigger the global window by advancing the watermark to 
+inf.
IIRC, as a pragmatic note, we prohibited global window with 
default
trigger on unbounded PCollections in the SDK because this is 
more
likely to be user error than an actual desire to have no 
output until

drain. But it's semantically valid in the model.


Re: Better naming for runner specific options

2019-05-22 Thread Maximilian Michels

+1

On 22.05.19 04:28, Reza Rokni wrote:

Hi,

Coming back to this, is the general consensus that this can be addressed 
via https://issues.apache.org/jira/browse/BEAM-6531 in Beam 3.0?


Cheers
Reza

On Tue, 7 May 2019 at 23:15, Valentyn Tymofieiev <mailto:valen...@google.com>> wrote:


I think using RunnerOptions was an idea at some point, but in
Python, we ended up parsing options from the runner api without
populating RunnerOptions, and  RunnerOptions was eventually removed [1].

If we decide to rename options, a path forward may be to have
runners recognize both old and new names until Beam 3.0, but update
codebase, examples and documentation to use new names.

[1]

https://github.com/apache/beam/commit/f3623e8ba2257f7659ccb312dc2574f862ef41b5#diff-525d5d65bedd7ea5e6fce6e4cd57e153L815

*From:*Ahmet Altay mailto:al...@google.com>>
*Date:*Mon, May 6, 2019, 6:01 PM
*To:*dev

There is RunnerOptions already. Its options are populated by
querying the job service. Any portable runner is able to provide
a list of options that is runner specific through that mechanism.

*From: *Reza Rokni mailto:r...@google.com>>
*Date: *Mon, May 6, 2019 at 2:57 PM
*To: * mailto:dev@beam.apache.org>>

So the options here would be moved to runner options?

https://beam.apache.org/releases/pydoc/2.12.0/_modules/apache_beam/options/pipeline_options.html#WorkerOptions

In Java they are in DataflowPipelineWorkerPoolOptions and of
course we have FlinkPipelineOptions etc...

*From: *Chamikara Jayalath mailto:chamik...@google.com>>
*Date: *Tue, 7 May 2019 at 05:29
*To: *dev


On Mon, May 6, 2019 at 2:13 PM Lukasz Cwik
mailto:lc...@google.com>> wrote:

There were also discussions[1] in the past about
scoping PipelineOptions to specific PTransforms.
Would scoping PipelineOptions to PTransforms make
this a more general solution?

1:

https://lists.apache.org/thread.html/05f849d39788cb0af840cb9e86ca631586783947eb4e5a1774b647d1@%3Cdev.beam.apache.org%3E


Is this just for pipeline construction time or also for
runtime ? Trying to scope options for transforms at
runtime might complicate things in the presence of
optimizations such as fusion.


On Mon, May 6, 2019 at 12:02 PM Ankur Goenka
mailto:goe...@google.com>> wrote:

Having namespaces for option makes sense.
I think, along with a help command to print all
the options given the runner name will be useful.
As for the scope of name spacing, I think that
assigning a logical name space gives more
flexibility around how and where we declare
options. It also make future refactoring possible.


    On Mon, May 6, 2019 at 7:50 AM Maximilian
Michels mailto:m...@apache.org>>
wrote:

Good points. As already mentioned there is
no namespacing between the
different pipeline option classes. In
particular, there is no separate
namespace for system and user options which
is most concerning.

I'm in favor of an optional namespace using
the class name of the
defining pipeline option class. That way we
would at least be able to
resolve duplicate option names. For example,
if there were was "optionX"
in class A and B, we could use "A#optionX"
to refer to it from class A.


I think this solves the original problem. Runner
specific options will have unique names that includes
the runner (in options class). I guess to be complete we
also have to include the package (module for Python) ?
If an option is globally unique, users should be able to
specify it without qualifying (at least for backwards
compatibility).


-Max

On 04.05.19 02:23, Reza Rokni wrote:
 > Great point Lukasz, worker machine could
be relevant to multiple runners.
 

Environments for External Transforms

2019-05-22 Thread Maximilian Michels

Hi,

Robert and me were discussing on the subject of user-specified 
environments for external transforms [1]. We couldn't decide whether 
users should have direct control over the environment when they use an 
external transform in their pipeline.


In my mind, it is quite natural that the Expansion Service is a 
long-running service that gets started with a list of available 
environments. Such a list can be outdated and users may write transforms 
for a new environment they want to use in their pipeline. The easiest 
way would be to allow to pass the environment with the transform. Note 
that we already give users control over the "main" environment via the 
PortablePipelineOptions, so this wouldn't be an entirely new concept.


The contrary position is that the Expansion Service should have full 
control over which environment is chosen. Going back to the discussion 
about artifact staging [2], this could enable to perform more 
optimizations, such as merging environments or detecting conflicts. 
However, this only works if this information has been provided upfront 
to the Expansion Service. It wouldn't be impossible to provide these 
hints alongside with the environment like suggested in the previous 
paragraph.


Any opinions? Should we allow users to optionally specify an environment 
for external transforms?


Thanks,
Max

[1] https://github.com/apache/beam/pull/8639
[2] 
https://lists.apache.org/thread.html/6fcee7047f53cf1c0636fb65367ef70842016d57effe2e5795c4137d@%3Cdev.beam.apache.org%3E


Re: Environments for External Transforms

2019-05-23 Thread Maximilian Michels
My motivation was to get rid of the Docker dependency for the Python VR 
tests. Similarly to how we use Python's LOOPBACK environment for 
executing all non-cross-language tests, I wanted to use Java's EMBEDDED 
environment to run the cross-language transforms.


I suppose we could also go with an override of the default environment 
during startup of the Expansion Service. This would be less intrusive 
and still allow us to use the embedded environment in some of the tests.



2(c) can also be "hacked" inside an SDK as an explicit environment override by the "user" 
where the expansion service isn't involved and the user/SDK manipulates the expansion service response. As 
Chamikara pointed out, I believe the response from the expansion service should be "safe" instead 
of allowing it to return broken combinations.


The responses are not safe today because we do not have a check whether 
an expanded transform is compatible with the default environment. I 
agree though that it is better to develop these checks first before 
allowing overrides by the user.


I'll need to follow-up with the artifact staging doc, then we can agree 
on the first steps for implementation.


Thanks,
Max

On 22.05.19 20:13, Lukasz Cwik wrote:
2(c) can also be "hacked" inside an SDK as an explicit environment 
override by the "user" where the expansion service isn't involved and 
the user/SDK manipulates the expansion service response. As Chamikara 
pointed out, I believe the response from the expansion service should be 
"safe" instead of allowing it to return broken combinations.


On Wed, May 22, 2019 at 11:08 AM Chamikara Jayalath 
mailto:chamik...@google.com>> wrote:




On Wed, May 22, 2019 at 9:17 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi,

Robert and me were discussing on the subject of user-specified
environments for external transforms [1]. We couldn't decide
whether
users should have direct control over the environment when they
use an
external transform in their pipeline.

In my mind, it is quite natural that the Expansion Service is a
long-running service that gets started with a list of available
environments. Such a list can be outdated and users may write
transforms
for a new environment they want to use in their pipeline. The
easiest
way would be to allow to pass the environment with the
transform. Note
that we already give users control over the "main" environment
via the
PortablePipelineOptions, so this wouldn't be an entirely new
concept.



I think we are trying to generalize the expansion service along
multiple axes.
(1) dependencies
(a) dependencies embedded in an environment (b) dependencies
specific to an transform (c) dependencies specified by the user
expanding the transform

(2) environments
(a)default environment (b) environments specified a startup of the
expansion service (c) environments specified by the user expanding
the transform (this proposal)

It's great if we can implement the most generic solution along all
these exes but I think we run into risk of resulting in broken
combinations by trying to implement this before we have other
necessary pieces to support a long running expansion service. For
example, support for dynamically registering transforms and support
for discovering transforms.

What is the need for implementing 2 (c) now ? If there's no real
need now I suggest we settle with 2(a) or 2(b) for now till we can
truly support a long running expansion service. Also we'll have a
better idea of how this kind if features should evolve when we have
at least two runners supporting cross-language transforms (we are in
the process of updating Dataflow to support this). Just my 2 cents
though :)


The contrary position is that the Expansion Service should have
full
control over which environment is chosen. Going back to the
discussion
about artifact staging [2], this could enable to perform more
optimizations, such as merging environments or detecting conflicts.
However, this only works if this information has been provided
upfront
to the Expansion Service. It wouldn't be impossible to provide
these
hints alongside with the environment like suggested in the previous
paragraph.

Any opinions? Should we allow users to optionally specify an
environment
for external transforms?

Thanks,
Max

[1] https://github.com/apache/beam/pull/8639
[2]

https://lists.apache.org/thread.html/6fcee7047f53cf1c0636fb65367ef70842016d57effe2e5795c4137d@%3Cdev.beam.apache.org%3E



Re: Environments for External Transforms

2019-05-23 Thread Maximilian Michels
 Writing a new transform involves updating the expansion service to include their new transform. 


Would it be conceivable that the expansion is performed via the 
environment? That would solve the problem of updating the expansion 
service, although it adds additional complexity for bringing up the 
environment.


On 23.05.19 11:31, Robert Bradshaw wrote:
On Wed, May 22, 2019 at 6:17 PM Maximilian Michels <mailto:m...@apache.org>> wrote:


Hi,

Robert and me were discussing on the subject of user-specified
environments for external transforms [1]. We couldn't decide whether
users should have direct control over the environment when they use an
external transform in their pipeline.

In my mind, it is quite natural that the Expansion Service is a
long-running service that gets started with a list of available
environments. 



+1.

IMHO, the expansion service should be expected to provide valid 
environments for the transforms it vendors. Removing this expectation 
seems wrong. Making it cheap to specify non-default dependencies without 
building (publishing, etc.) a docker image is probably key to making 
this work well (and also allowing more powerful environment introspection).


Such a list can be outdated and users may write transforms
for a new environment they want to use in their pipeline. 



This is the part that I'm having trouble following. Writing a new 
transform involves updating the expansion service to include their new 
transform. The author of a transform (in other words, the one who 
defines its expansion and implementation) is in the position to name its 
dependencies, etc. and the user of the transform (the one invoking it) 
is not in a generally good position to know what environments would be 
valid.


The easiest
way would be to allow to pass the environment with the transform. 



What this allows is using existing transforms in new environments. There 
are possibly some usecases for this, e.g. expansion of a given transform 
may be compatible with ether version X or version Y of a library, left 
up to the discretion of the caller, but I think that this is really just 
a deficiency in our environment specifications (e.g. it one should be 
able to express this flexibility in the returned environment).


Note
that we already give users control over the "main" environment via the
PortablePipelineOptions, so this wouldn't be an entirely new concept.


Yes, the author of a pipeline/transform chooses the environment in which 
those transforms execute.


The contrary position is that the Expansion Service should have full
control over which environment is chosen. Going back to the discussion
about artifact staging [2], this could enable to perform more
optimizations, such as merging environments or detecting conflicts.
However, this only works if this information has been provided upfront
to the Expansion Service. It wouldn't be impossible to provide these
hints alongside with the environment like suggested in the previous
paragraph.

Any opinions? Should we allow users to optionally specify an
environment
for external transforms?

Thanks,
Max

[1] https://github.com/apache/beam/pull/8639
[2]

https://lists.apache.org/thread.html/6fcee7047f53cf1c0636fb65367ef70842016d57effe2e5795c4137d@%3Cdev.beam.apache.org%3E



Re: Environments for External Transforms

2019-05-27 Thread Maximilian Michels

Which environment would be used to perform the expansion? I think this is an 
interesting option, as long as it does not introduce a hard dependency on 
docker.


The same environment that the to-be-expanded transform requires during 
runtime.



Dataflow has been doing something similar in this route where it is trying to 
get rid of the driver program running on the users machine. If you can get the 
expansion service to launch and run an environment to perform the expansion, 
you could also get it to create and submit a job as well returning data around 
the running job.


Portability already runs without a driver on the user machine, apart 
from expansion and staging. For anything runtime-related the job server 
kicks in. It's worth to think about delegating expansion and staging to 
the Job server.


On 24.05.19 23:48, Lukasz Cwik wrote:
Dataflow has been doing something similar in this route where it is 
trying to get rid of the driver program running on the users machine. If 
you can get the expansion service to launch and run an environment to 
perform the expansion, you could also get it to create and submit a job 
as well returning data around the running job.


On Thu, May 23, 2019 at 7:47 AM Thomas Weise <mailto:t...@apache.org>> wrote:




On Thu, May 23, 2019 at 3:46 AM Maximilian Michels mailto:m...@apache.org>> wrote:

 >  Writing a new transform involves updating the expansion
service to include their new transform.

Would it be conceivable that the expansion is performed via the
environment? That would solve the problem of updating the expansion
service, although it adds additional complexity for bringing up the
environment.


Which environment would be used to perform the expansion? I think
this is an interesting option, as long as it does not introduce a
hard dependency on docker.

On 23.05.19 11:31, Robert Bradshaw wrote:
 > On Wed, May 22, 2019 at 6:17 PM Maximilian Michels
mailto:m...@apache.org>
 > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
 >
 >     Hi,
 >
 >     Robert and me were discussing on the subject of
user-specified
 >     environments for external transforms [1]. We couldn't
decide whether
 >     users should have direct control over the environment
when they use an
 >     external transform in their pipeline.
 >
 >     In my mind, it is quite natural that the Expansion
Service is a
 >     long-running service that gets started with a list of
available
 >     environments.
 >
 >
 > +1.
 >
 > IMHO, the expansion service should be expected to provide valid
 > environments for the transforms it vendors. Removing this
expectation
 > seems wrong. Making it cheap to specify non-default
dependencies without
 > building (publishing, etc.) a docker image is probably key to
making
 > this work well (and also allowing more powerful environment
introspection).
 >
 >     Such a list can be outdated and users may write transforms
 >     for a new environment they want to use in their pipeline.
 >
 >
 > This is the part that I'm having trouble following. Writing a
new
 > transform involves updating the expansion service to include
their new
 > transform. The author of a transform (in other words, the one
who
 > defines its expansion and implementation) is in the position
to name its
 > dependencies, etc. and the user of the transform (the one
invoking it)
 > is not in a generally good position to know what environments
would be
 > valid.
 >
 >     The easiest
 >     way would be to allow to pass the environment with the
transform.
 >
 >
 > What this allows is using existing transforms in new
environments. There
 > are possibly some usecases for this, e.g. expansion of a
given transform
 > may be compatible with ether version X or version Y of a
library, left
 > up to the discretion of the caller, but I think that this is
really just
 > a deficiency in our environment specifications (e.g. it one
should be
 > able to express this flexibility in the returned environment).
 >
 >     Note
 >     that we already give users control over the "main"
environment via the
 >     PortablePipelineOptions, so this wouldn't be an entire

Re: Beam Summit Europe: speakers and schedule online!

2019-05-27 Thread Maximilian Michels

Thanks Matthias! This is going to be a very special event for all attendees.

-Max

On 24.05.19 20:12, Joana Filipa Bernardo Carrasqueira wrote:

Great! Thanks for spotting that. We'll update it in our page!


On Fri, May 24, 2019 at 4:56 AM Aljoscha Krettek > wrote:


You’re both right. The Kulturbrauerei Area is between Knaackstraße
and Schönhauser Allee and there’s entrances on multiple sides.
Schönhauser Alee is the more prominent street, though.

Btw, I live on Schönhauser Allee. :-)


On 24. May 2019, at 13:48, Suneel Marthi mailto:smar...@apache.org>> wrote:

Kulturbraurei is on Schönhauser Allee - u have the address wrong
on the event page.

On Thu, May 23, 2019 at 4:58 PM Joana Filipa Bernardo Carrasqueira
mailto:joanafil...@google.com>> wrote:

Hi all!

Looking forward to the conversations about Beam and to meet
new people in the community!

Please help us spreading the word about the Beam Summit within
your networks and register for the event here

.


See you all soon!
Joana


On Thu, May 23, 2019 at 6:24 AM Matthias Baetens
mailto:baetensmatth...@gmail.com>>
wrote:

Hi everyone,

Happy to share that the speakers
 and schedule
 are now online on the
website.

Make sure you register on Eventbrite
 if you want
to attend and follow out Twitter channel
 for announcements
regarding the speakers over the next few weeks!

Best regards,
Matthias



-- 
*Joana Carrasqueira*

Cloud Developer Relations Events Manager
+1 415-602-2507
1160 N Mathilda Ave, Sunnyvale, CA 94089





--

*Joana Carrasqueira*

Cloud Developer Relations Events Manager

+1 415-602-2507

1160 N Mathilda Ave, Sunnyvale, CA 94089




Re: Environments for External Transforms

2019-05-27 Thread Maximilian Michels

I think it makes a lot of sense for job servers to also act as
expansion services, but one can't of course defer expansion until job
submission.


One could defer the expansion until job submission but it would be a 
semantic change to how expansion works currently. In particular with 
respect to providing feedback to the user during expansion and with 
regard to immutability of pipelines, this would not be a good choice.


As for the Flink job server, it already hosts an expansion server. It 
would make sense to let them share the same GRPC server which would 
avoid having to know the port of the expansion server.


On 27.05.19 13:33, Robert Bradshaw wrote:

On Mon, May 27, 2019 at 12:38 PM Maximilian Michels  wrote:



Which environment would be used to perform the expansion? I think this is an 
interesting option, as long as it does not introduce a hard dependency on 
docker.


The same environment that the to-be-expanded transform requires during
runtime.


Dataflow has been doing something similar in this route where it is trying to 
get rid of the driver program running on the users machine. If you can get the 
expansion service to launch and run an environment to perform the expansion, 
you could also get it to create and submit a job as well returning data around 
the running job.


Portability already runs without a driver on the user machine, apart
from expansion and staging. For anything runtime-related the job server
kicks in. It's worth to think about delegating expansion and staging to
the Job server.


I think it makes a lot of sense for job servers to also act as
expansion services, but one can't of course defer expansion until job
submission.


On 24.05.19 23:48, Lukasz Cwik wrote:

Dataflow has been doing something similar in this route where it is
trying to get rid of the driver program running on the users machine. If
you can get the expansion service to launch and run an environment to
perform the expansion, you could also get it to create and submit a job
as well returning data around the running job.

On Thu, May 23, 2019 at 7:47 AM Thomas Weise mailto:t...@apache.org>> wrote:



 On Thu, May 23, 2019 at 3:46 AM Maximilian Michels mailto:m...@apache.org>> wrote:

  >  Writing a new transform involves updating the expansion
 service to include their new transform.

 Would it be conceivable that the expansion is performed via the
 environment? That would solve the problem of updating the expansion
 service, although it adds additional complexity for bringing up the
 environment.


 Which environment would be used to perform the expansion? I think
 this is an interesting option, as long as it does not introduce a
 hard dependency on docker.

 On 23.05.19 11:31, Robert Bradshaw wrote:
  > On Wed, May 22, 2019 at 6:17 PM Maximilian Michels
 mailto:m...@apache.org>
  > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
  >
  > Hi,
  >
  > Robert and me were discussing on the subject of
 user-specified
  > environments for external transforms [1]. We couldn't
 decide whether
  > users should have direct control over the environment
 when they use an
  > external transform in their pipeline.
  >
  > In my mind, it is quite natural that the Expansion
 Service is a
  > long-running service that gets started with a list of
 available
  > environments.
  >
  >
  > +1.
  >
  > IMHO, the expansion service should be expected to provide valid
  > environments for the transforms it vendors. Removing this
 expectation
  > seems wrong. Making it cheap to specify non-default
 dependencies without
  > building (publishing, etc.) a docker image is probably key to
 making
  > this work well (and also allowing more powerful environment
 introspection).
  >
  > Such a list can be outdated and users may write transforms
  > for a new environment they want to use in their pipeline.
  >
  >
  > This is the part that I'm having trouble following. Writing a
 new
  > transform involves updating the expansion service to include
 their new
  > transform. The author of a transform (in other words, the one
 who
  > defines its expansion and implementation) is in the position
 to name its
  > dependencies, etc. and the user of the transform (the one
 invoking it)
  > is not in a generally good position to know what environments
 

Re: Proposal: Portability SDKHarness Docker Image Release with Beam Version Release.

2019-05-27 Thread Maximilian Michels

+1

On 27.05.19 14:04, Robert Bradshaw wrote:

Sounds like everyone's onboard with the plan. Any chance we could
publish these for the upcoming 2.13 release?

On Wed, Feb 6, 2019 at 6:29 PM Łukasz Gajowy  wrote:


+1 to have a registry for images accessible to anyone. For snapshot images, I 
agree that gcr + apache-beam-testing project seems a good and easy way to start 
with.

Łukasz

wt., 22 sty 2019 o 19:43 Mark Liu  napisał(a):


+1 to have an official Beam released container image.

Also I would propose to add a verification step to (or after) the release 
process to do smoke check. Python have ValidatesContainer test that runs basic 
pipeline using newly built container for verification. Other sdk languages can 
do similar thing or add a common framework.

Mark

On Thu, Jan 17, 2019 at 5:56 AM Alan Myrvold  wrote:


+1 This would be great. gcr.io seems like a good option for snapshots due to 
the permissions from jenkins to upload and ability to keep snapshots around.

On Wed, Jan 16, 2019 at 6:51 PM Ruoyun Huang  wrote:


+1 This would be a great thing to have.

On Wed, Jan 16, 2019 at 6:11 PM Ankur Goenka  wrote:


grc.io seems to be a good option. Given that we don't need the hosting server 
name in the image name makes it easily changeable later.

Docker container for Apache Flink is named "flink" and they have different tags for 
different releases and configurations https://hub.docker.com/_/flink .We can follow a similar model 
and can name the image as "beam" (beam doesn't seem to be taken on docker hub) and use 
tags to distinguish Java/Python/Go and versions etc.

Tags will look like:
java-SNAPSHOT
java-2.10.1
python2-SNAPSHOT
python2-2.10.1
go-SNAPSHOT
go-2.10.1


On Wed, Jan 16, 2019 at 5:56 PM Ahmet Altay  wrote:


For snapshots, we could use gcr.io. Permission would not be a problem since 
Jenkins is already correctly setup. The cost will be covered under 
apache-beam-testing project. And since this is only for snapshots, it will be 
only for temporary artifacts not for release artifacts.

On Wed, Jan 16, 2019 at 5:50 PM Valentyn Tymofieiev  wrote:


+1, releasing containers is a useful process that we need to build in Beam and 
it is required for FnApi users. Among other reasons, having officially-released 
Beam SDK harness container images will make it easier for users to do simple 
customizations to  container images, as they will be able to use container 
image released by Beam as a base image.

Good point about potential storage limitations on Bintray. With Beam Release 
cadence we may quickly exceed the 10 GB quota. It may also affect our decisions 
as to which images we want to release, for example: do we want to only release 
one container image with Python 3 interpreter, or do we want to release a 
container image for each Python 3 minor version that Beam is compatible with.



Probably worth a separate discussion. I would favor first releasing a python 3 
compatible version before figuring out how we would target multiple python 3 
versions.





On Wed, Jan 16, 2019 at 5:48 PM Ankur Goenka  wrote:




On Wed, Jan 16, 2019 at 5:37 PM Ahmet Altay  wrote:




On Wed, Jan 16, 2019 at 5:28 PM Ankur Goenka  wrote:


- Could we start from snapshots first and then do it for releases?
+1, releasing snapsots first makes sense to me.
- For snapshots, do we need to clean old containers after a while? Otherwise I 
guess we will accumulate lots of containers.
For snap shots we can maintain a single snapshot image from git HEAD daily. 
Docker has the internal image container id which changes everytime an image is 
changed and pulls new images as needed.



There is a potential use this may not work with. If a user picks up a snaphsot 
build and want to use it until the next release arrives. I guess in that case 
the user can copy the snapshotted container image and rely on that.



Yes, that should be reasonable.


- Do we also need additional code changes for snapshots and releases to default 
to these specific containers? There could be a version based mechanism to 
resolve the correct container to use.
The current image defaults have username in it. We should be ok by just 
updating the default image url to published image url.

We should also check for pricing and details about Apache-Bintray agreement 
before pushing images and changing defaults.



There is information on bintray's pricing page about open source projects [1]. 
I do not know if there is a special apache-bintray agreement or not. If there 
is no special agreement there is a 10GB storage limit for using bintray.


As each image can easily run into Gigs, 10GB might not be sufficient for future 
proofing.
We can also register docker image to docker image registry and not have bintray 
in the name to later host images on a different vendor for future proofing.



[1] https://bintray.com/account/pricing?tab=account&type=pricing




On Wed, Jan 16, 2019 at 5:11 PM Ahmet Altay  wrote:


This sounds like a good idea. Some quest

Re: Timer support in Flink

2019-05-29 Thread Maximilian Michels

Hi Reza,

The detailed view of the capability matrix states: "The Flink Runner 
supports timers in non-merging windows."


That is still the case. Other than that, timers should be working fine.


It makes very heavy use of Event.Time timers and has to do some manual DoFn 
cache work to get around some O(heavy) issues.


If you are running on Flink 1.5, timer deletion suffers from O(n) 
complexity which has been fixed in newer versions.


Cheers,
Max

On 29.05.19 03:27, Reza Rokni wrote:

Hi Flink experts,

I am getting ready to push a PR around a utility class for timeseries join

left.timestamp match to closest right.timestamp where right.timestamp <= 
left.timestamp.


It makes very heavy use of Event.Time timers and has to do some manual 
DoFn cache work to get around some O(heavy) issues. Wanted to test 
things against Flink: In the capability matrix we have "~" for Timer 
support in Flink:


https://beam.apache.org/documentation/runners/capability-matrix/

Is that page outdated, if not what are the areas that still need to be 
addressed please?


Cheers

Reza


--

This email may be confidential and privileged. If you received this 
communication by mistake, please don't forward it to anyone else, please 
erase all copies and attachments, and please let me know that it has 
gone to the wrong person.


The above terms reflect a potential business arrangement, are provided 
solely as a basis for further discussion, and are not intended to be and 
do not constitute a legally binding obligation. No legally binding 
obligations will be created, implied, or inferred until an agreement in 
final form is executed in writing by all parties involved.




Re: [DISCUSS] Autoformat python code with Black

2019-05-29 Thread Maximilian Michels
I think the question is if it can be configured in a way to fit our 
current linter's style. I don't think it is feasible to reformat the 
entire Python SDK.


Reformatted lines don't allow quick access to the Git history. This 
effect is still visible in the Java SDK. However, I have the feeling 
that this might be less of a problem with Python because the linter has 
more rules than Checkstyle had.


-Max

On 29.05.19 10:16, Ismaël Mejía wrote:

My concerns are:
- The product is clearly marked as beta with a big warning.
- It looks like mostly a single person project. For the same reason I also 
strongly prefer not using a fork for a specific setting. Fork will only have 
less people looking at it.


I suppose the project is marked as beta because it is recent, it was
presented in 2018’s pycon, and because some things can change since
auto-formatters are pretty tricky beasts, I think beta in that case is
like our own ‘@Experimental’. If you look at the contribution page [1]
you can notice that it is less and less a single person project, there
have been 93 independent contributions since the project became
public, and the fact that it is hosted in the python organization
github [2] gives some confidence on the project continuity.

You are right however about the fact that the main author seems to be
the ‘benevolent’ dictator, and in the 2-spaces issue he can seem
arbitrary, but he is just following pep8 style guide recommendations
[3]. I am curious of why we (Beam) do not follow the 4 spaces
recommendation of PEP-8 or even Google's own Python style guide [4],
So, probably it should be to us to reconsider the current policy to
adapt to the standards (and the tool).

I did a quick run of black with python 2.7 compatibility on
sdks/python and got only 4 parsing errors which is positive given the
size of our code base.

415 files reformatted, 45 files left unchanged, 4 files failed to reformat.

error: cannot format
/home/ismael/upstream/beam/sdks/python/apache_beam/runners/interactive/display/display_manager.py:
Cannot parse: 47:22:   _display_progress = print
error: cannot format
/home/ismael/upstream/beam/sdks/python/apache_beam/runners/worker/log_handler.py:
Cannot parse: 151:18:   file=sys.stderr)
error: cannot format
/home/ismael/upstream/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py:
Cannot parse: 160:34:   print(traceback_string, file=sys.stderr)
error: cannot format
/home/ismael/upstream/beam/sdks/python/apache_beam/typehints/trivial_inference.py:
Cannot parse: 335:51:   print('-->' if pc == last_pc else '',
end=' ')

I still think this can be positive for the project but well I am
barely a contributor to the python code base so I let you the python
maintainers to reconsider this, in any case it seems like a good
improvement for the project.

[1] https://github.com/python/black/graphs/contributors
[2] https://github.com/python
[3] https://www.python.org/dev/peps/pep-0008/#indentation
[4] https://github.com/google/styleguide/blob/gh-pages/pyguide.md#34-indentation

On Tue, May 28, 2019 at 11:15 PM Ahmet Altay  wrote:


I am in the same boat with Robert, I am in favor of autoformatters but I am not 
familiar with this one. My concerns are:
- The product is clearly marked as beta with a big warning.
- It looks like mostly a single person project. For the same reason I also 
strongly prefer not using a fork for a specific setting. Fork will only have 
less people looking at it.

IMO, this is in an early stage for us. That said lint issues are real as 
pointed in the thread. If someone would like to give it a try and see how it 
would look like for us that would be interesting.

On Tue, May 28, 2019 at 4:44 AM Katarzyna Kucharczyk  
wrote:


This sounds really good. A lot of Jenkins jobs failures are caused by lint 
problems.
I think it would be great to have something similar to Spotless in Java SDK (I 
heard there is problem with configuring Black with IntelliJ).

On Mon, May 27, 2019 at 10:52 PM Robert Bradshaw  wrote:


I'm generally in favor of autoformatters, though I haven't looked at
how well this particular one works. We might have to go with
https://github.com/desbma/black-2spaces given
https://github.com/python/black/issues/378 .

On Mon, May 27, 2019 at 10:43 PM Pablo Estrada  wrote:


This looks pretty good:) I know at least a couple people (myself included) 
who've been annoyed by having to take care of lint issues that maybe a code 
formatter could save us.
Thanks for sharing Ismael.
-P.


On Mon, May 27, 2019, 12:24 PM Ismaël Mejía  wrote:


I stumbled by chance into Black [1] a python code auto formatter that
is becoming the 'de-facto' auto-formatter for python, and wanted to
bring to the ML Is there interest from the python people to get this
into the build?

The introduction of spotless for Java has been a good improvement and
maybe the python code base may benefit of this too.

WDYT?

[1] https://github.com/python/black


Re: 1 Million Lines of Code (1 MLOC)

2019-06-03 Thread Maximilian Michels
Interesting stats :) This metric does not take into a account Beam's 
dependencies, e.g. libraries and execution backends. That would increase 
the LOCs to millions.


On 01.06.19 01:46, Alex Amato wrote:
Interesting, so if we play with https://github.com/cgag/loc we could 
break it down further? I.e. test files vs code files? Which folders, 
etc. That could be interesting as well.


On Fri, May 31, 2019 at 4:20 PM Brian Hulette > wrote:


Dennis Nedry needed 2 million lines of code to control Jurassic
Park, and he only had to manage eight computers! I think we may
actually need to pick up the pace.

On Fri, May 31, 2019 at 4:11 PM Anton Kedin mailto:ke...@google.com>> wrote:

And to reduce the effort of future rewrites we should start
doing it on a schedule. I propose we start over once a week :)

On Fri, May 31, 2019 at 4:02 PM Lukasz Cwik mailto:lc...@google.com>> wrote:

1 million lines is too much, time to delete the entire
project and start over again, :-)

On Fri, May 31, 2019 at 3:12 PM Ankur Goenka
mailto:goe...@google.com>> wrote:

Thanks for sharing.
This is really interesting metrics.
One use I can see is to track LOC vs Comments to make
sure that we keep up with the practice of writing
maintainable code.

On Fri, May 31, 2019 at 3:04 PM Ismaël Mejía
mailto:ieme...@gmail.com>> wrote:

I was checking some metrics in our codebase and
found by chance that
we have passed the 1 million lines of code (MLOC).
Of course lines of
code may not matter much but anyway it is
interesting to see the size
of our project at this moment.

This is the detailed information returned by loc [1]:



  Language             Files        Lines   
Blank      Comment         Code



  Java                  3681       673007   
78265       140753       453989
  Python                 497       131082   
22560        13378        95144
  Go                     333       105775   
13681        11073        81021
  Markdown               205        31989   
  6526            0        25463
  Plain Text              11        21979   
  6359            0        15620
  Sass                    92         9867   
  1434         1900         6533
  JavaScript              19         5157   
  1197          467         3493
  YAML                    14         4601 
454         1104         3043
  Bourne Shell            30         3874 
470         1028         2376
  Protobuf                17         4258 
677         1373         2208
  XML                     17         2789 
296          559         1934
  Kotlin                  19         3501 
347         1370         1784
  HTML                    60         2447 
148          914         1385
  Batch                    3          249 
  57            0          192
  INI                      1          206 
  21           16          169
  C++                      2           72   
4           36           32
  Autoconf                 1           21   
1           16            4



  Total                 5002      1000874 
  132497       173987       694390




[1] https://github.com/cgag/loc



Re: Timer support in Flink

2019-06-03 Thread Maximilian Michels
Good point. I think I discovered the detailed view when I made changes 
to the source code. Classic tunnel-vision problem :)


On 30.05.19 12:57, Reza Rokni wrote:

:-)

https://issues.apache.org/jira/browse/BEAM-7456

On Thu, 30 May 2019 at 18:41, Alex Van Boxel <mailto:a...@vanboxel.be>> wrote:


Oh... you can expand the matrix. Never saw that, this could indeed
be better. So it isn't you.

  _/
_/ Alex Van Boxel


On Thu, May 30, 2019 at 12:24 PM Reza Rokni mailto:r...@google.com>> wrote:

PS, until it was just pointed out to me by Max, I had missed the
(expand details) clickable link in the capability matrix.

Probably just me, but do others think it's also easy to miss? If
yes I will raise a Jira for it

On Wed, 29 May 2019 at 19:52, Reza Rokni mailto:r...@google.com>> wrote:

Thanx Max!

Reza

On Wed, 29 May 2019, 16:38 Maximilian Michels,
mailto:m...@apache.org>> wrote:

Hi Reza,

The detailed view of the capability matrix states: "The
Flink Runner
supports timers in non-merging windows."

That is still the case. Other than that, timers should
be working fine.

 > It makes very heavy use of Event.Time timers and has
to do some manual DoFn cache work to get around some
O(heavy) issues.

If you are running on Flink 1.5, timer deletion suffers
from O(n)
complexity which has been fixed in newer versions.

Cheers,
Max

On 29.05.19 03:27, Reza Rokni wrote:
 > Hi Flink experts,
 >
 > I am getting ready to push a PR around a utility
class for timeseries join
 >
 > left.timestamp match to closest right.timestamp where
right.timestamp <=
 > left.timestamp.
 >
 > It makes very heavy use of Event.Time timers and has
to do some manual
 > DoFn cache work to get around some O(heavy) issues.
Wanted to test
 > things against Flink: In the capability matrix we
have "~" for Timer
 > support in Flink:
 >
 >
https://beam.apache.org/documentation/runners/capability-matrix/
 >
 > Is that page outdated, if not what are the areas that
still need to be
 > addressed please?
 >
 > Cheers
 >
 > Reza
 >
 >
 > --
 >
 > This email may be confidential and privileged. If you
received this
 > communication by mistake, please don't forward it to
anyone else, please
 > erase all copies and attachments, and please let me
know that it has
 > gone to the wrong person.
 >
 > The above terms reflect a potential business
arrangement, are provided
 > solely as a basis for further discussion, and are not
intended to be and
 > do not constitute a legally binding obligation. No
legally binding
 > obligations will be created, implied, or inferred
until an agreement in
 > final form is executed in writing by all parties
involved.
 >



-- 


This email may be confidential and privileged. If you received
this communication by mistake, please don't forward it to anyone
else, please erase all copies and attachments, and please let me
know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are
provided solely as a basis for further discussion, and are not
intended to be and do not constitute a legally binding
obligation. No legally binding obligations will be created,
implied, or inferred until an agreement in final form is
executed in writing by all parties involved.



--

This email may be confidential and privileged. If you received this 
communication by mistake, please don't forward it to anyone else, please 
erase all copies and attachments, and please let me know that it has 
gone to the wrong person.


The above terms reflect a potential business arrangement, are provided 
solely as a basis for furth

Re: [VOTE] Release 2.13.0, release candidate #2

2019-06-03 Thread Maximilian Michels

+1 (binding)

Tested Flink Runner local/cluster execution with the included examples 
and all supported Flink versions.


There is an issue with the staging for remote execution but it is not a 
blocker since an alternative way exists: 
https://jira.apache.org/jira/browse/BEAM-7478



Reminder: The voting closes on 2nd June so please validate and vote by then.


We generally we do not include weekends in the minimum voting period of 
72 hours. I'd propose to leave the vote open at least until Wednesday 
04:52 CEST which would be 72 hours excluding the weekend. Btw, thank you 
for all your work on preparing the RC!


-Max

On 03.06.19 09:33, Robert Bradshaw wrote:

+1

I validated the artifacts and Python 3.

On Sat, Jun 1, 2019 at 7:45 PM Ankur Goenka  wrote:


Thanks Ahmet and Luke for validation.

If no one has objections then I am planning to move ahead without Gearpump 
validation as it seems to be broken from past multiple releases.

Reminder: The voting closes on 2nd June so please validate and vote by then.

On Fri, May 31, 2019 at 10:43 AM Ahmet Altay  wrote:


+1

I validated python 2 quickstarts.

On Fri, May 31, 2019 at 10:22 AM Lukasz Cwik  wrote:


I did the Java local quickstart for all the runners in the release validation 
sheet and gearpump failed for me due to a missing dependency. Even after I 
fixed up the dependency, the pipeline then got stuck. I filed BEAM-7467 with 
all the details.

Note that I tried the quickstart for 2.8.0 through 2.12.0
2.8.0 and 2.9.0 failed due to a timeout (maybe I was using the wrong command 
but this test[1] suggests that I was using a correct one)
2.10.0 and higher fail due to the missing gs-collections dependency.

Manu, could you help figure out what is going on?

1: 
https://github.com/apache/beam/blob/2d3bcdc542536037c3e657a8b00ebc222487476b/release/src/main/groovy/quickstart-java-gearpump.groovy#L33

On Thu, May 30, 2019 at 7:53 PM Ankur Goenka  wrote:


Hi everyone,

Please review and vote on the release candidate #2 for the version 2.13.0, as 
follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2], 
which is signed with the key with fingerprint 
6356C1A9F089B0FA3DE8753688934A6699985948 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.13.0-RC2" [5],
* website pull request listing the release [6] and publishing the API reference 
manual [7].
* Python artifacts are deployed along with the source release to the 
dist.apache.org [2].
* Validation sheet with a tab for 2.13.0 release to help with validation [8].

The vote will be open for at least 72 hours. It is adopted by majority 
approval, with at least 3 PMC affirmative votes.

Thanks,
Ankur

[1] 
https://jira.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12345166
[2] https://dist.apache.org/repos/dist/dev/beam/2.13.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1070/
[5] https://github.com/apache/beam/tree/v2.13.0-RC2
[6] https://github.com/apache/beam/pull/8645
[7] https://github.com/apache/beam-site/pull/589
[8] 
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1031196952


Re: [VOTE] Release 2.13.0, release candidate #2

2019-06-04 Thread Maximilian Michels

The summary is not correct. Binding votes (in order):

Ahmet Altay
Robert Bradshaw
Maximilian Michels
Jean-Baptiste Onofré
Lukasz Cwik

A total of 5 binding votes.

On 04.06.19 02:37, Ankur Goenka wrote:

+1
Thanks for validating the release and voting.
With 0(-1), 6(+1) and 3(+1 binding) votes, I am concluding the voting 
process.
I am going ahead with the release and will keep the community posted 
with the updates.


On Mon, Jun 3, 2019 at 1:57 PM Andrew Pilloud <mailto:apill...@google.com>> wrote:


+1 Reviewed the Nexmark java and SQL perfkit graphs, no obvious
regressions over the previous release.

On Mon, Jun 3, 2019 at 1:15 PM Lukasz Cwik mailto:lc...@google.com>> wrote:

Thanks for the clarification.

On Mon, Jun 3, 2019 at 11:40 AM Ankur Goenka mailto:goe...@google.com>> wrote:

Yes, i meant i will close the voting at 5pm and start the
release process.

On Mon, Jun 3, 2019, 10:59 AM Lukasz Cwik mailto:lc...@google.com>> wrote:

Ankur, did you mean to say your going to close the vote
today at 5pm? (and then complete the release afterwards)

On Mon, Jun 3, 2019 at 10:54 AM Ankur Goenka
mailto:goe...@google.com>> wrote:

Thanks for validating and voting.

We have 4 binding votes.
I will complete the release today 5PM. Please raise
any concerns before that.

Thanks,
Ankur

On Mon, Jun 3, 2019 at 8:36 AM Lukasz Cwik
mailto:lc...@google.com>> wrote:

Since the gearpump issue has been ongoing since
2.10, I can't consider it a blocker for this
release and am voting +1.

On Mon, Jun 3, 2019 at 7:13 AM Jean-Baptiste
Onofré mailto:j...@nanthrax.net>> wrote:

+1 (binding)

Quickly tested on beam-samples.

Regards
JB

On 31/05/2019 04:52, Ankur Goenka wrote:
 > Hi everyone,
 >
 > Please review and vote on the release
candidate #2 for the version
 > 2.13.0, as follows:
 >
 > [ ] +1, Approve the release
 > [ ] -1, Do not approve the release
(please provide specific comments)
 >
 > The complete staging area is available
for your review, which includes:
 > * JIRA release notes [1],
 > * the official Apache source release to
be deployed to dist.apache.org
<http://dist.apache.org>
 > <http://dist.apache.org> [2], which is
signed with the key with
 > fingerprint
6356C1A9F089B0FA3DE8753688934A6699985948 [3],
 > * all artifacts to be deployed to the
Maven Central Repository [4],
 > * source code tag "v2.13.0-RC2" [5],
 > * website pull request listing the
release [6] and publishing the API
 > reference manual [7].
 > * Python artifacts are deployed along
with the source release to the
 > dist.apache.org <http://dist.apache.org>
<http://dist.apache.org> [2].
 > * Validation sheet with a tab for 2.13.0
release to help with validation
 > [8].
 >
 > The vote will be open for at least 72
hours. It is adopted by majority
 > approval, with at least 3 PMC affirmative
votes.
 >
 > Thanks,
 > Ankur
 >
 > [1]
 >

https://jira.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12345166
 > [2]
  

Re: [PROPOSAL] Prepare for LTS bugfix release 2.7.1

2019-06-07 Thread Maximilian Michels

Created an up-to-date version of the Flink backports for 2.7.1: 
https://github.com/apache/beam/pull/8787

Some of the Gradle task names have changed which makes testing via Jenkins 
hard. Will have to run them manually before merging.

-Max

On 06.06.19 17:41, Kenneth Knowles wrote:

Hi all,

Re-raising this thread. I got busy for the last month, and also did not
want to overlap the 2.13.0 release process. Now I want to pick up 2.7.1
again.

Can everyone check on any bug they have targeted to 2.7.1 [1] and get
the backports merged to release-2.7.1 and the tickets resolved?

Kenn

[1]
https://issues.apache.org/jira/issues/?jql=project%20%3D%20BEAM%20AND%20resolution%20%3D%20Unresolved%20AND%20fixVersion%20%3D%202.7.1%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

On Fri, Apr 26, 2019 at 11:19 AM Ahmet Altay mailto:al...@google.com>> wrote:

    I agree with both keeping 2.7.x going until a new LTS is declared
    and declaring LTS spost-release after some use. 2.12 might actually
    be a good candidate, with multiple RCs/validations it presumably is
    well tested. We can consider that after it gets some real world use.

    On Fri, Apr 26, 2019 at 6:29 AM Robert Bradshaw mailto:rober...@google.com>> wrote:

    IIRC, there was some talk on making 2.12 the next LTS, but the
    consensus is to decide on a LTS after having had some experience
    with
    it, not at or before the release itself.


    On Fri, Apr 26, 2019 at 3:04 PM Alexey Romanenko
    mailto:aromanenko@gmail.com>> wrote:
 >
 > Thanks for working on this, Kenn.
 >
 > Perhaps, I missed this but has it been already
    discussed/decided what will be the next LTS release?
 >
 > On 26 Apr 2019, at 08:02, Kenneth Knowles mailto:k...@apache.org>> wrote:
 >
 > Since it is all trivially reversible if there is some other
    feeling about this thread, I have gone ahead and started the work:
 >
 >  - I made release-2.7.1 branch point to the same commit as
    release-2.7.0 so there is something to target PRs
 >  - I have opened the first PR, cherry-picking the set_version
    script and using it to set the version on the branch to 2.7.1:
    https://github.com/apache/beam/pull/8407 (found bug in the new
    script right away :-)
 >
 > Here is the release with list of issues:
    https://issues.apache.org/jira/projects/BEAM/versions/12344458.
    So anyone can grab a ticket and volunteer to open a backport PR
    to the release-2.7.1 branch.
 >
 > I don't have a strong opinion about how long we should
    support the 2.7.x line. I am curious about different
    perspectives on user / vendor needs. I have two very basic
    thoughts: (1) we surely need to keep it going until some time
    after we have another LTS designated, to make sure there is a
    clear path for anyone only using LTS releases and (2) if we
    decide to end support of 2.7.x but then someone volunteers to
    backport and release, of course I would not expect anyone to
    block them, so it has no maximum lifetime, but we just need
    consensus on a minimum. And of course that consensus cannot
    force anyone to do the work, but is just a resolution of the
    community.
 >
 > Kenn
 >
 > On Thu, Apr 25, 2019 at 10:29 PM Jean-Baptiste Onofré
    mailto:j...@nanthrax.net>> wrote:
 >>
 >> +1 it sounds good to me.
 >>
 >> Thanks !
 >>
 >> Regards
 >> JB
 >>
 >> On 26/04/2019 02:42, Kenneth Knowles wrote:
 >> > Hi all,
 >> >
 >> > Since the release of 2.7.0 we have identified some serious
    bugs:
 >> >
 >> >  - There are 8 (non-dupe) issues* tagged with Fix Version
    2.7.1
 >> >  - 2 are rated "Blocker" (aka P0) but I think the others
    may be underrated
 >> >  - If you know of a critical bug that is not on that list,
    please file
 >> > an LTS backport ticket for it
 >> >
 >> > If a user is on an old version and wants to move to the
    LTS, there are
 >> > some real blockers. I propose that we perform a 2.7.1
    release starting now.
 >> >
 >> > I volunteer to manage the release. What do you think?
 >> >
 >> > Kenn
 >> >
 >> > *Some are "resolved" but this is not accurate as the LTS
    2.7.1 branch is
 >> > not created yet. I suggest filing a ticket to track just
    the LTS
 >> > backport when you hit a bug that merits it.
 >> >
 >
 >





Re: [DISCUSS] Cookbooks for users with knowledge in other frameworks

2019-06-07 Thread Maximilian Michels

Sounds like a good idea. I think the same can be done for Flink; Flink's and 
Spark's APIs are similar to a large degree.

Here also a link to the transforms: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/

-Max

On 04.06.19 03:20, Ahmet Altay wrote:

Thank you for the feedback so far. It seems like this will be generally
helpful :)

I guess next step would be, would anyone be interested in working in
this area? We can potentially break this down into starter tasks.

On Sat, Jun 1, 2019 at 7:00 PM Ankur Goenka mailto:goe...@google.com>> wrote:

    +1 for the proposal.
    Compatibility Matrix
     can
    be a good place to show case parity between different runners.


+1

    Do you think we should write 2 way examples [Spark, Flink, ..]<=>Beam?


Both ways, would be most useful I believe.




    On Sat, Jun 1, 2019 at 4:31 PM Reza Rokni mailto:r...@google.com>> wrote:

    For layer 1, what about working through this link as a starting
    point :
    
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations?


+1


    On Sat, 1 Jun 2019 at 09:21, Ahmet Altay mailto:al...@google.com>> wrote:

    Thank you Reza. That separation makes sense to me.

    On Wed, May 29, 2019 at 6:26 PM Reza Rokni mailto:r...@google.com>> wrote:

    +1

    I think there will be at least two layers of this;

    Layer 1 - Using primitives : I do join, GBK,
    Aggregation... with system x this way, what is the
    canonical equivalent in Beam.
    Layer 2 - Patterns : I read and join Unbounded and
    Bounded Data in system x this way, what is the canonical
    equivalent in Beam.

    I suspect as a first pass Layer 1 is reasonably well
    bounded work, there would need to be agreement on
    "canonical" version of how to do something in Beam as
    this could be seen to be opinionated. As there are often
    a multitude of ways of doing x


    Once we identify a set of layer 1 items, we could crowd
    source the canonical implementations. I believe we can use
    our usual code review process to settle on a version that is
    agreeable. (Examples have the same issue, they are
    probably opinionated today based on the author but it works
    out.)



    On Thu, 30 May 2019 at 08:56, Ahmet Altay
    mailto:al...@google.com>> wrote:

    Hi all,

    Inspired by the user asking about a Spark feature in
    Beam [1] in the release thread, I searched the user@
    list and noticed a few instances of people asking
    for question like "I can do X in Spark, how can I do
    that in Beam?" Would it make sense to add
    documentation to explain how certain tasks that can
    be accomplished in Beam with side by side examples
    of doing the same task in Beam/Spark etc. It could
    help with on-boarding because it will be easier for
    people to leverage their existing knowledge. It
    could also help other frameworks as well, because it
    will serve as a Rosetta stone with two translations.

    Questions I have are:
    - Would such a thing be a helpful?
    - Is it feasible? Would a few pages worth of
    examples can cover enough use cases?

    Thank you!
    Ahmet

    [1]
    
https://lists.apache.org/thread.html/b73a54aa1e6e9933628f177b04a8f907c26cac854745fa081c478eff@%3Cdev.beam.apache.org%3E



    --

    This email may be confidential and privileged. If you
    received this communication by mistake, please don't
    forward it to anyone else, please erase all copies and
    attachments, and please let me know that it has gone to
    the wrong person.

    The above terms reflect a potential business
    arrangement, are provided solely as a basis for further
    discussion, and are not intended to be and do not
    constitute a legally binding obligation. No legally
    binding obligations will be created, implied, or
    inferred until an agreement in final form is executed in
    writing by all parties involved.



    --

    This email may be confidential and privileged. If you received
    this communication by mistake, please don't forward it to anyone
    else, please erase all copies and attachments, and please l

Re: [ANNOUNCE] Apache Beam 2.13.0 released!

2019-06-10 Thread Maximilian Michels
Thanks for managing the release, Ankur!

@Chad Thanks for the feedback. I agree that we can improve our release notes. 
The particular issue you were looking for was part of the detailed list [1] 
linked in the blog post: https://jira.apache.org/jira/browse/BEAM-7029

Cheers,
Max

[1] 
https://jira.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12345166

On 08.06.19 00:11, Chad Dombrova wrote:
> I saw this and was particularly excited about the new support for
> "external" transforms in portable runners like python (i.e. the ability
> to use the Java KafkaIO transforms, with presumably more to come in the
> future).  While the release notes are useful, I will say that it takes a
> lot of time and effort to sift through the release notes to find
> relevant issues.  They're not grouped by sdk/component, and for example,
> not all of the python issues include the word "python" in their title.
> It would be great to have a blurb on the Beam blog explaining the
> highlights.  An example of a project that I think does this very well is
> mypy:  http://mypy-lang.blogspot.com/
>
> thanks!
> chad
>
>
>
>
>
> On Fri, Jun 7, 2019 at 2:58 PM Kyle Weaver  > wrote:
>
>     Awesome! Thanks for leading the release Ankur.
>
>     On Fri, Jun 7, 2019 at 2:57 PM Ankur Goenka      > wrote:
>
>     The Apache Beam team is pleased to announce the release of
>     version 2.13.0!
>
>     Apache Beam is an open source unified programming model to
>     define and
>     execute data processing pipelines, including ETL, batch and stream
>     (continuous) processing. See https://beam.apache.org
>     
>
>     You can download the release here:
>
>         https://beam.apache.org/get-started/downloads/
>
>     This release includes bugfixes, features, and improvements
>     detailed on
>     the Beam
>     blog: https://beam.apache.org/blog/2019/05/22/beam-2.13.0.html
>
>     Thanks to everyone who contributed to this release, and we hope
>     you enjoy
>     using Beam 2.13.0.
>
>     -- Ankur Goenka, on behalf of The Apache Beam team
>
>     --
>     Kyle Weaver | Software Engineer | github.com/ibzib
>      | kcwea...@google.com
>      | +1650203
>



Re: Hazelcast Jet Runner

2019-07-09 Thread Maximilian Michels
We should fork the discussion around removing instances of @Experimental, but 
it was good to mention it here.

As for the Jet runner, I can only second Ismael: The Jet runner is the first 
runner I can think of that came with ValidatesRunner and Nexmark out of the 
box. Of course that doesn't mean the runner is "battled-tested", but we do not 
have other means to test its maturity.

For the future, we could come up with other criteria, e.g. a "probation 
period", but enforcing this now seems arbitrary.

If the authors of the Runners decide that it is experimental, so be it. 
Otherwise I would leave it to the user to decide (it might be helpful to list 
the inception date of each runner). That said, I value your concern Kenn. I can 
see that we establish a consistent onboarding of new runners which may involve 
marking them experimental for a while.

-Max

On 01.07.19 22:20, Kenneth Knowles wrote:
>
>
> On Wed, Jun 12, 2019 at 2:32 AM Ismaël Mejía  > wrote:
>
>     Seems the discussion moved a bit of my original intent that was to
>     make the Jet runner directory to be just called runners/jet in the
>     directory and mark the 'experimental' part of it in documentation as
>     we do for all other things in Beam.
>
>
> Thanks for returning to the one question at hand. We don't have to make
> an overall decision about all "experimental" things.
>  
>
>     Can we do this or is there still any considerable argument to not do it?
>
>
> I think we actually have some competing goals:
>
>     I agree 100% on the arguments, but let’s think in the reverse terms,
>     highlighting lack of maturity can play against the intended goal of
>     use and adoption even if for a noble reason. It is basic priming 101
>     [1].
>
>
> _My_ goal is exactly to highlight lack of maturity so that users are not
> harmed by either (1) necessary breaking changes or (2) permanent low
> quality. Only users who are willing to follow along with the project and
> update their own code regularly should use experimental features.
>
> Evaluating the Jet runner I am convinced by your arguments, because
> looking at the two dangers:
> (1) necessary breaking changes -- runners don't really have their own
> APIs to break, except their own small set of APIs and pipeline options
> (2) permanent low quality -- because there is no API design possible,
> there's no risk of permanent low quality except by fundamental
> mismatches. Plus as you mention the testing is already quite good.
>
> So I am OK to not call it experimental. But I have a slight remaining
> concern that it did not really go through what other runners went
> through. I hope this just means it is more mature. I hope it does not
> indicate that we are reducing rigor.
>
> Kenn
>  
>
>     On Wed, May 29, 2019 at 3:02 PM Reza Rokni      > wrote:
>     >
>     > Hi,
>     >
>     > Over 800 usages under java, might be worth doing a few PR...
>     >
>     > Also suggest we use a very light review process: First round go
>     for low hanging fruit, if anyone does a -1 against a change then we
>     leave that for round two.
>     >
>     > Thoughts?
>     >
>     > Cheers
>     >
>     > Reza
>     >
>     > On Wed, 29 May 2019 at 12:05, Kenneth Knowles      > wrote:
>     >>
>     >>
>     >>
>     >> On Mon, May 27, 2019 at 4:05 PM Reza Rokni      > wrote:
>     >>>
>     >>> "Many APIs that have been in place for years and are used by
>     most Beam users are still marked Experimental."
>     >>>
>     >>> Should there be a formal process in place to start 'graduating'
>     features out of @Experimental? Perhaps even target an up coming
>     release with a PR to remove the annotation from well established API's?
>     >>
>     >>
>     >> Good idea. I think a PR like this would be an opportunity to
>     discuss whether the feature is non-experimental. Probably many of
>     them are ready. It would help to address Ismael's very good point
>     that this new practice could make users think the old Experimental
>     stuff is not experimental. Maybe it is true that it is not really
>     still Experimental.
>     >>
>     >> Kenn
>     >>
>     >>
>     >>>
>     >>> On Tue, 28 May 2019 at 06:44, Reuven Lax      > wrote:
>     
>      We generally use Experimental for two different things, which
>     leads to confusion.
>        1. Features that work stably, but where we think we might
>     still make some changes to the API.
>        2. New features that we think might not yet be stable.
>     
>      This dual usage leads to a lot of confusion IMO. The fact that
>     we tend to forget to remove the @Experimental tag also makes it
>     somewhat useless. Many APIs that have been in place for years and
>     are used by most Beam users are still marked Experimental.
>     
>      Reuven
>     
>      On Mon, May 

Re: [VOTE] Vendored dependencies release process

2019-07-09 Thread Maximilian Michels
+1

On 09.07.19 22:16, Udi Meiri wrote:
> +1 LGTM
>
> On Mon, Jul 8, 2019 at 4:54 PM Lukasz Cwik  > wrote:
>
>     Thanks for taking a look. I followed up on your questions.
>
>     On Mon, Jul 8, 2019 at 3:58 PM Udi Meiri      > wrote:
>
>     I left some comments. Being new to the Beam releasing process,
>     my question might be trivial to someone actually performing the
>     release.
>
>     On Tue, Jul 2, 2019 at 4:49 PM Lukasz Cwik      > wrote:
>
>     Please vote based on the vendored dependencies release
>     process as discussed[1] and documented[2].
>
>     Please vote as follows:
>     +1: Adopt the vendored dependency release process
>     -1: The vendored release process needs to change because ...
>
>     Since many people in the US may be out due to the holiday
>     schedule, I'll try to close the vote and tally the results
>     on July 9th so please vote before then.
>
>     1: 
> https://lists.apache.org/thread.html/e2c49a5efaee2ad416b083fbf3b9b6db60fdb04750208bfc34cecaf0@%3Cdev.beam.apache.org%3E
>     2: https://s.apache.org/beam-release-vendored-artifacts
>



Re: Spotless exclusions

2019-07-09 Thread Maximilian Michels
Thanks for asking here on the mailing list. Just saw the PR.

The PR breaks Spotless for the Flink Runner, although all of its source is 
under `src/`. The Flink Runner uses multiple source directories for supporting 
different Flink versions. Not all of them seem to be recognized anymore now:

  runners/flink/src/... <-- Spotless does not work
  runners/flink/1.5/src/... <-- Spotless works

I prefer Anton's changes over the previous solution, but I couldn't get it to 
work with the Flink project layout. I'll experiment but otherwise I would 
propose to revert to the old solution.

Cheers,
Max

On 27.06.19 01:50, Lukasz Cwik wrote:
>
> On Wed, Jun 26, 2019 at 4:22 PM Anton Kedin  > wrote:
>
>     Currently our spotless is configured globally [1] (for java at
>     least) to include all source files by '**/*.java'. And then we
>     exclude things explicitly. Don't know why, but these exclusions are
>     ignored for me sometimes, for example `./gradlew
>     :sdks:java:core:spotlessJavaCheck` always fails when checking the
>     generated files under
>     
> `.../build/generated-src/antlr/main/org/apache/beam/sdk/schemas/parser/generated`.
>
>     Few questions:
>      * can someone point me to a discussion or a jira about this behavior?
>
>
> BEAM-6399 and BEAM-7366 allude to something wonky going on.
>  
>
>      * do we actually have a use case of checking the source files that
>     are not under 'src'?
>
>
> No
>  
>
>      * if not, can we switch the config to only check for sources under
>     'src' [2]?
>
>
> Yes
>  
>
>      * alternatively, would it make sense to introduce project-specific
>     overrides?
>
>
> All src should be under src/ so it is unlikely to be useful.
>  
>
>
>     [1] 
> https://github.com/apache/beam/blob/af9362168606df9ec11319fe706b72466413798c/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L819
>     [2] https://github.com/apache/beam/pull/8954
>



Re: Hazelcast Jet Runner

2019-07-11 Thread Maximilian Michels
I believe that is the case. Thanks Kenn.

On 10.07.19 21:35, Ismaël Mejía wrote:
> Yes please!
>
> On Wed, Jul 10, 2019 at 8:38 PM Kenneth Knowles  wrote:
> >
> > Just to make sure we have closed on the Jet runner, my understanding is: I 
> > was the main person asking for "runners-jet-experimental" but I am 
> > convinced to go with plain "runners-jet". It seems everyone else is already 
> > fine with this, so go ahead?
> >
> > On Tue, Jul 9, 2019 at 1:23 PM Maximilian Michels  wrote:
> >>
> >> We should fork the discussion around removing instances of @Experimental, 
> >> but it was good to mention it here.
> >>
> >> As for the Jet runner, I can only second Ismael: The Jet runner is the 
> >> first runner I can think of that came with ValidatesRunner and Nexmark out 
> >> of the box. Of course that doesn't mean the runner is "battled-tested", 
> >> but we do not have other means to test its maturity.
> >>
> >> For the future, we could come up with other criteria, e.g. a "probation 
> >> period", but enforcing this now seems arbitrary.
> >>
> >> If the authors of the Runners decide that it is experimental, so be it. 
> >> Otherwise I would leave it to the user to decide (it might be helpful to 
> >> list the inception date of each runner). That said, I value your concern 
> >> Kenn. I can see that we establish a consistent onboarding of new runners 
> >> which may involve marking them experimental for a while.
> >>
> >> -Max
> >>
> >> On 01.07.19 22:20, Kenneth Knowles wrote:
> >>>
> >>>
> >>> On Wed, Jun 12, 2019 at 2:32 AM Ismaël Mejía  >>> <mailto:ieme...@gmail.com>> wrote:
> >>>
> >>> Seems the discussion moved a bit of my original intent that was to
> >>> make the Jet runner directory to be just called runners/jet in the
> >>> directory and mark the 'experimental' part of it in documentation as
> >>> we do for all other things in Beam.
> >>>
> >>>
> >>> Thanks for returning to the one question at hand. We don't have to make
> >>> an overall decision about all "experimental" things.
> >>>
> >>>
> >>> Can we do this or is there still any considerable argument to not do 
> >>>it?
> >>>
> >>>
> >>> I think we actually have some competing goals:
> >>>
> >>> I agree 100% on the arguments, but let’s think in the reverse terms,
> >>> highlighting lack of maturity can play against the intended goal of
> >>> use and adoption even if for a noble reason. It is basic priming 101
> >>> [1].
> >>>
> >>>
> >>> _My_ goal is exactly to highlight lack of maturity so that users are not
> >>> harmed by either (1) necessary breaking changes or (2) permanent low
> >>> quality. Only users who are willing to follow along with the project and
> >>> update their own code regularly should use experimental features.
> >>>
> >>> Evaluating the Jet runner I am convinced by your arguments, because
> >>> looking at the two dangers:
> >>> (1) necessary breaking changes -- runners don't really have their own
> >>> APIs to break, except their own small set of APIs and pipeline options
> >>> (2) permanent low quality -- because there is no API design possible,
> >>> there's no risk of permanent low quality except by fundamental
> >>> mismatches. Plus as you mention the testing is already quite good.
> >>>
> >>> So I am OK to not call it experimental. But I have a slight remaining
> >>> concern that it did not really go through what other runners went
> >>> through. I hope this just means it is more mature. I hope it does not
> >>> indicate that we are reducing rigor.
> >>>
> >>> Kenn
> >>>
> >>>
> >>> On Wed, May 29, 2019 at 3:02 PM Reza Rokni  >>> <mailto:r...@google.com>> wrote:
> >>> >
> >>> > Hi,
> >>> >
> >>> > Over 800 usages under java, might be worth doing a few PR...
> >>> >
> >>> > Also suggest we use a very light review process: First round go
> >>> for low hanging fruit, if anyone does a -1 against a change then we
> >>>

Re: Spotless exclusions

2019-07-27 Thread Maximilian Michels
Quick update. I ended up removing the Flink-specific SourceSet traversal in 
BeamModulePlugin in favor of an explicit override in the Flink Gradle config: 
https://github.com/apache/beam/pull/9176

Overall, the default Spotless configuration is now much cleaner. Thank you 
Anton!

-Max

On 09.07.19 22:35, Maximilian Michels wrote:
> Thanks for asking here on the mailing list. Just saw the PR.
>
> The PR breaks Spotless for the Flink Runner, although all of its source is 
> under `src/`. The Flink Runner uses multiple source directories for 
> supporting different Flink versions. Not all of them seem to be recognized 
> anymore now:
>
>   runners/flink/src/... <-- Spotless does not work
>   runners/flink/1.5/src/... <-- Spotless works
>
> I prefer Anton's changes over the previous solution, but I couldn't get it to 
> work with the Flink project layout. I'll experiment but otherwise I would 
> propose to revert to the old solution.
>
> Cheers,
> Max
>
> On 27.06.19 01:50, Lukasz Cwik wrote:
> >
> > On Wed, Jun 26, 2019 at 4:22 PM Anton Kedin  > <mailto:ke...@google.com>> wrote:
> >
> >     Currently our spotless is configured globally [1] (for java at
> >     least) to include all source files by '**/*.java'. And then we
> >     exclude things explicitly. Don't know why, but these exclusions are
> >     ignored for me sometimes, for example `./gradlew
> >     :sdks:java:core:spotlessJavaCheck` always fails when checking the
> >     generated files under
> >     
> > `.../build/generated-src/antlr/main/org/apache/beam/sdk/schemas/parser/generated`.
> >
> >     Few questions:
> >      * can someone point me to a discussion or a jira about this behavior?
> >
> >
> > BEAM-6399 and BEAM-7366 allude to something wonky going on.
> >  
> >
> >      * do we actually have a use case of checking the source files that
> >     are not under 'src'?
> >
> >
> > No
> >  
> >
> >      * if not, can we switch the config to only check for sources under
> >     'src' [2]?
> >
> >
> > Yes
> >  
> >
> >      * alternatively, would it make sense to introduce project-specific
> >     overrides?
> >
> >
> > All src should be under src/ so it is unlikely to be useful.
> >  
> >
> >
> >     [1] 
> > https://github.com/apache/beam/blob/af9362168606df9ec11319fe706b72466413798c/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L819
> >     [2] https://github.com/apache/beam/pull/8954
> >
>



Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-31 Thread Maximilian Michels
Hi Jincheng,

Thanks for getting back to us.

> For the next major release of Flink, we plan to add Python user defined 
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
> portability framework and think that it is perfect for our requirements. 
> However we also find some improvements needed for Beam:

That sounds great! The improvement list contains very reasonable
suggestions, some of them which are already on our TODO list. I think
Thomas and Robert already provided the answers you were looking for.

> Open questions:
> -
> 1) Which coders should be / can be defined in StandardCoders?

The ones which are present now those are:

  BYTES_CODER
  INT64_CODER
  STRING_UTF8
  ITERABLE_CODER
  TIMER_CODER
  KV_CODER
  LENGTH_PREFIX_CODER
  GLOBAL_WINDOW_CODER
  INTERVAL_WINDOW_CODER
  WINDOWED_VALUE_CODER
  DOUBLE_CODER

Note, that this is just across SDK borders. If you stay within one SDK,
you can use any coder. If a Runner wants to replace a particular coder
with its own coder implementation, it could do that. Flink may want to
use its own set of coders for the sake of coder migration. Another
option Robert alluded to, would be to make use of Schema were possible,
which has been built with migration in mind.

Thanks,
Max

> 
> Must Have:
> 
> 1) Currently only BagState is supported in gRPC protocol and I think we 
> should support more kinds of state types, such as MapState, ValueState, 
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's because 
> these kinds of state will be used in both user-defined function or Flink 
> Python DataStream API.
> 
> 2) There are warnings that Python 3 is not fully supported in Beam 
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
> portability framework due to Python 2 will be not supported officially.
> 
> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
> the runner side. Why I think it's  must to have is because when the 
> environment type is "PROCESS", the default value "/tmp" may become a big 
> problem. 
> 
> 4) The buffer size configure policy should be improved, such as: 
>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver is 
> size based. We should also support time based especially for the streaming 
> case.
>At Python SDK Harness, the buffer size is not configurable in 
> GrpcDataService. The input queue size of the input buffer in Python SDK 
> Harness is not size limited.
>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the threshold 
> configurable and support time based threshold.
> 
> Nice To Have:
> ---
> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
> etc, to change the parameter type from WindowedValue to T. (We have 
> already discussed in the previous mails)
> 
> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
> because there are a few classes in beam-sdks-java-core are used in 
> beam-runners-java-fn-execution, such as:
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
> BeamFileSystemArtifactRetrievalService.
> It means maybe we can add a new module such as beam-sdks-java-common to hold 
> the classes used by both runner and SDK.
> 
> 3) State cache is not shared between bundles which is performance critical 
> for streaming jobs.
> 
> 4) The coder of WindowedValue cannot be configured and most of time we don't 
> need to serialize and deserialize the timestamp, window and pane properties 
> in Flink. But currently FullWindowedValueCoder is used by default in 
> WireCoders.addWireCoder, I suggest to make the coder configurable (i.e. 
> allowing to use ValueOnlyWindowedValueCoder)
> 
> 5) Currently if a coder is not defined in StandardCoders, it will be wrapped 
> with LengthPrefixedCoder (WireCoders.addWireCoder -> 
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few coders 
> are defined in StandardCoders. It means that for most coder, a length will be 
> added to the serialized bytes which is not necessary in my thoughts. My 
> suggestion is maybe we can add some interfaces or tags for the coder which 
> indicate whether the coder is needed a length prefix or not.
> 
> 6) Set log level according to PipelineOption in Python SDK Harness. Currently 
> the log level is set to INFO by default.
> 
> 7) Allows to start up StatusServer according to PipelineOption in Python SDK 
> Harness. Currently the StatusServer is start up by default.
> 
> Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
> related, I still think they are very critical for Python UDF execution 
> performance.
> 
> Open q

Re: [ANNOUNCE] New committer: Jan Lukavský

2019-08-01 Thread Maximilian Michels
Congrats, Jan! Good to see you become a committer :)

On 01.08.19 12:37, Łukasz Gajowy wrote:
> Congratulations!
>
> czw., 1 sie 2019 o 11:16 Robert Bradshaw  > napisał(a):
>
>     Congratulations!
>
>     On Thu, Aug 1, 2019 at 9:59 AM Jan Lukavský      > wrote:
>
>     Thanks everyone!
>
>     Looking forward to working with this great community! :-)
>
>     Cheers,
>
>      Jan
>
>     On 8/1/19 12:18 AM, Rui Wang wrote:
> > Congratulations!
> >
> >
> > -Rui
> >
> > On Wed, Jul 31, 2019 at 10:51 AM Robin Qiu  > > wrote:
> >
> > Congrats!
> >
> > On Wed, Jul 31, 2019 at 10:31 AM Aizhamal Nurmamat kyzy
> > mailto:aizha...@apache.org>> wrote:
> >
> > Congratulations, Jan! Thank you for your contributions!
> >
> > On Wed, Jul 31, 2019 at 10:04 AM Tanay Tummalapalli
> > mailto:ttanay...@gmail.com>> wrote:
> >
> > Congratulations!
> >
> > On Wed, Jul 31, 2019 at 10:05 PM Ahmet Altay
> > mailto:al...@google.com>> wrote:
> >
> > Congratulations Jan! Thank you for your
> > contributions!
> >
> > On Wed, Jul 31, 2019 at 2:30 AM Ankur Goenka
> > mailto:goe...@google.com>>
> > wrote:
> >
> > Congratulations Jan!
> >
> > On Wed, Jul 31, 2019, 1:23 AM David
> > Morávek  > > wrote:
> >
> > Congratulations Jan, well deserved! ;)
> >
> > D.
> >
> > On Wed, Jul 31, 2019 at 10:17 AM Ryan
> > Skraba  > > wrote:
> >
> > Congratulations Jan!
> >
> > On Wed, Jul 31, 2019 at 10:10 AM
> > Ismaël Mejía  > > wrote:
> > >
> > > Hi,
> > >
> > > Please join me and the rest of
> > the Beam PMC in welcoming a new
> > > committer: Jan Lukavský.
> > >
> > > Jan has been contributing to
> > Beam for a while, he was part of
> > the team
> > > that contributed the Euphoria
> > DSL extension, and he has done
> > > interesting improvements for the
> > Spark and Direct runner. He has also
> > > been active in the community
> > discussions around the Beam model and
> > > other subjects.
> > >
> > > In consideration of Jan's
> > contributions, the Beam PMC trusts
> > him with
> > > the responsibilities of a Beam
> > committer [1].
> > >
> > > Thank you, Jan, for your
> > contributions and looking forward
> > to many more!
> > >
> > > Ismaël, on behalf of the Apache
> > Beam PMC
> > >
> > > [1]
> > 
> >https://beam.apache.org/committer/committer
> >



Re: [ANNOUNCE] New committer: Robert Burke

2019-08-01 Thread Maximilian Michels
Go Robert! ;) Congrats

On 30.07.19 19:38, Mark Liu wrote:
> Congratulations Robert!
>
> On Thu, Jul 18, 2019 at 9:49 AM 송원욱  > wrote:
>
>     Congrats Robert!
>
>
>   Wonook
>
>
>
>     2019년 7월 18일 (목) 오전 6:47, Kyle Weaver      >님이 작성:
>
>     +1 to faster Go SDK iteration! Well-deserved, Rebo
>
>     Kyle Weaver | Software Engineer | github.com/ibzib
>      | kcwea...@google.com
>      | +1650203
>
>
>     On Wed, Jul 17, 2019 at 2:44 PM Robert Burke      > wrote:
>
>     Thanks all! Hopefully this does mean reduced latency to
>     merge when folks send me Go SDK reviews. Let's get Beam GOing!
>
>     On Wed, Jul 17, 2019, 11:22 AM Melissa Pashniak
>     mailto:meliss...@google.com>> wrote:
>
>
>     Congratulations!
>
>
>     On Wed, Jul 17, 2019 at 6:06 AM Alexey Romanenko
>          > wrote:
>
>     Congratulations, Robert!
>
> > On 17 Jul 2019, at 14:49, Tim Robertson
> >  > > wrote:
> >
> > Congratulations Robert!
> >
> > On Wed, Jul 17, 2019 at 2:47 PM Gleb Kanterov
> > mailto:g...@spotify.com>> wrote:
> >
> > Congratulations, Robert!
> >
> > On Wed, Jul 17, 2019 at 1:50 PM Robert
> > Bradshaw  > > wrote:
> >
> > Congratulations!
> >
> > On Wed, Jul 17, 2019, 12:56 PM Katarzyna
> > Kucharczyk  > > wrote:
> >
> > Congratulations! :)
> >
> > On Wed, Jul 17, 2019 at 12:46 PM
> > Michał Walenia
> >  > >
> > wrote:
> >
> > Congratulations, Robert! :)
> >
> > On Wed, Jul 17, 2019 at 12:45 PM
> > Łukasz Gajowy  > > wrote:
> >
> > Congratulations! :)
> >
> > śr., 17 lip 2019 o
> > 04:30 Rakesh Kumar
> >  > >
> > napisał(a):
> >
> > Congrats Rob!!!
> >
> > On Tue, Jul 16, 2019 at
> > 10:24 AM Ahmet Altay
> >  > >
> > wrote:
> >
> > Hi,
> >
> > Please join me and the
> > rest of the Beam PMC
> > in welcoming a
> > new committer: Robert
> > Burke.
> >
> > Robert has been
> > contributing to Beam
> > and actively involved
> > in the community for
> > over a year. He has
> > been actively working
> > on Go SDK, helping
> > users, and making it
> > easier for others to
> > contribute [1].
> >
> > In consideration of
> > Robert's
> > contributions, the
> > Beam PMC trusts him
> > with the
> > responsibilities of a
> > Beam committer [2].
> >
> >

Re: Write-through-cache in State logic

2019-08-12 Thread Maximilian Michels
Thanks for starting this discussion Rakesh. An efficient cache layer is
one of the missing pieces for good performance in stateful pipelines.
The good news are that there is a level of caching already present in
Python which batches append requests until the bundle is finished.

Thomas, in your example indeed we would have to profile to see why CPU
utilization is high on the Flink side but not in the Python SDK harness.
For example, older versions of Flink (<=1.5) have a high cost of
deleting existing instances of a timer when setting a timer.
Nevertheless, cross-bundle caching would likely result in increased
performance.

Luke, I think the idea to merge pending state requests could be
complementary to caching across bundles.

Question: Couldn't we defer flushing back state from the SDK to the
Runner indefinitely, provided that we add a way to flush the state in
case of a checkpoint?

Another performance improvement would be caching read requests because
these first go to the Runner regardless of already cached appends.

-Max

On 09.08.19 17:12, Lukasz Cwik wrote:
> 
> 
> On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw  > wrote:
> 
> The question is whether the SDK needs to wait for the StateResponse to
> come back before declaring the bundle done. The proposal was to not
> send the cache token back as part of an append StateResponse [1], but
> pre-provide it as part of the bundle request.
> 
> 
> Agree, the purpose of the I'm Blocked message is to occur during bundle
> processing. 
>  
> 
> Thinking about this some more, if we assume the state response was
> successfully applied, there's no reason for the SDK to block the
> bundle until it has its hands on the cache token--we can update the
> cache once the StateResponse comes back whether or not the bundle is
> still active. On the other hand, the runner needs a way to assert it
> has received and processed all StateRequests from the SDK associated
> with a bundle before it can declare the bundle complete (regardless of
> the cache tokens), so this might not be safe without some extra
> coordination (e.g. the ProcessBundleResponse indicating the number of
> state requests associated with a bundle).
> 
>  
> Since the state request stream is ordered, we can add the id of the last
> state request as part of the ProcessBundleResponse.
>  
> 
> [1]
> 
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
> 
> On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik  > wrote:
> >
> > The purpose of the new state API call in BEAM-7000 is to tell the
> runner that the SDK is now blocked waiting for the result of a
> specific state request and it should be used for fetches (not
> updates) and is there to allow for SDKs to differentiate readLater
> (I will need this data at some point in time in the future) from
> read (I need this data now). This comes up commonly where the user
> prefetches multiple state cells and then looks at their content
> allowing the runner to batch up those calls on its end.
> >
> > The way it can be used for clear+append is that the runner can
> store requests in memory up until some time/memory limit or until it
> gets its first "blocked" call and then issue all the requests together.
> >
> >
> > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw
> mailto:rober...@google.com>> wrote:
> >>
> >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise  > wrote:
> >> >
> >> > That would add a synchronization point that forces extra
> latency especially in streaming mode.
> >> >
> >> > Wouldn't it be possible for the runner to assign the token when
> starting the bundle and for the SDK to pass it along the state
> requests? That way, there would be no need to batch and wait for a
> flush.
> >>
> >> I think it makes sense to let the runner pre-assign these state
> update
> >> tokens rather than forcing a synchronization point.
> >>
> >> Here's some pointers for the Python implementation:
> >>
> >> Currently, when a DoFn needs UserState, a StateContext object is used
> >> that converts from a StateSpec to the actual value. When running
> >> portably, this is FnApiUserStateContext [1]. The state handles
> >> themselves are cached at [2] but this context only lives for the
> >> lifetime of a single bundle. Logic could be added here to use the
> >> token to share these across bundles.
> >>
> >> Each of these handles in turn invokes state_handler.get* methods when
> >> its read is called. (Here state_handler is a thin wrapper around the
> >> service itself) and constructs the appropriate result from the
> >> StateResponse. We would need to implement caching at this level as
> >> well, inc

Re: [FLINK-12653] and system state

2019-08-13 Thread Maximilian Michels
Hi Jan,

Just checking, do you see the same rescaling problem as described in
https://jira.apache.org/jira/browse/FLINK-12653 ?

If so, you are most likely correct that this is due to the system state
that you added in your code. When I did the fix, I ran some tests to
check if any system state is not bound. I did not find instances but you
are right that we could see this issue for internal state, e.g. in
ReduceFnContextFactory.

Given that this is a Flink specific bug I'm not sure it warrants adding
a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
prone since we have to remember to add all state there. The better
solution would be to eagerly register state during StateSpec creation,
but this would require significant code refactoring.

Wouldn't it suffice to just perform an early binding in your code?
Additionally, we want to make sure to also revise any existing Beam code
paths.

The issue hopefully will be fixed with Flink 1.9. Would be interesting
to try with the Flink 1.9 RC2:
https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/

Cheers,
Max

On 12.08.19 19:58, Jan Lukavský wrote:
> I've managed to fix that by introducing (optional) method to DoFnRunner 
> called getSystemStateTags() (default implementation returns 
> Collection.emptyList()), and the use that list to early bind states in 
> Flink's DoFnOperator ([1])
> 
> @Max, WDYT?
> 
> Jan
> 
> [1] 
> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> 
> On 8/12/19 4:00 PM, Jan Lukavský wrote:
>> Hi,
>>
>> I have come across issue that is very much likely caused by [1]. The 
>> issue is that Beam's state is (generally) created lazily, after 
>> element is received (as Max described in the Flink's JIRA). Max also 
>> created workaround [2], but that seems to work for user state only 
>> (i.e. state that has been created in user code by declaring @StateId - 
>> please correct me if I'm wrong). In my work, however, I created a 
>> system state (that holds elements before being output, due to 
>> @RequiresTimeSortedInput annotation, but that's probably not 
>> important), and this state is not covered by the workaround. This 
>> might be the case for all system states, generally, because they are 
>> not visible, until element arrives and the state is actually created.
>>
>> Is my analysis correct? If so, would anyone have any suggestions how 
>> to fix that?
>>
>> Jan
>>
>> [1] https://jira.apache.org/jira/browse/FLINK-12653
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-7144
>>


Re: [FLINK-12653] and system state

2019-08-13 Thread Maximilian Michels
Sounds good. Might be worth commenting on the JIRA to get this prioritized in 
case it has not been fixed.

-Max

On 13.08.19 12:18, Jan Lukavský wrote:
> Hi Max,
>
> comments inline.
>
> On 8/13/19 12:01 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > Just checking, do you see the same rescaling problem as described in
> > https://jira.apache.org/jira/browse/FLINK-12653 ?
> Yes.
> >
> > If so, you are most likely correct that this is due to the system state
> > that you added in your code. When I did the fix, I ran some tests to
> > check if any system state is not bound. I did not find instances but you
> > are right that we could see this issue for internal state, e.g. in
> > ReduceFnContextFactory.
> I think that there were no instances of internal state used in Flink
> Runner prior to my patch introduced internal state for sorting inputs.
> But that seems a little fragile, because it might easily change.
> >
> > Given that this is a Flink specific bug I'm not sure it warrants adding
> > a `getSystemStateTags()` method to the DoFnRunner. Also, this is error
> > prone since we have to remember to add all state there. The better
> > solution would be to eagerly register state during StateSpec creation,
> > but this would require significant code refactoring.
> I'm also not happy with adding additional generic method just because of
> one runner, but registering that during creation of StateSpec would be
> hard, as you said.
> >
> > Wouldn't it suffice to just perform an early binding in your code?
> > Additionally, we want to make sure to also revise any existing Beam code
> > paths.
> I think I might do it (although it would mean that it would be
> registered early for all runners, not just Flink).
> >
> > The issue hopefully will be fixed with Flink 1.9. Would be interesting
> > to try with the Flink 1.9 RC2:
> > https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
>
> I was not sure from comments in the Flink JIRA, that this will be fixes
> soon. If so, I'm fine with registering just the single state I
> introduced. If this would be an issue for long term I think it would
> require some other solution.
>
> So - I will register the state(s) I have created and test that on Flink
> 1.9 when I have a little spare time. Will decide what to do next, ok?
>
> Jan
>
> >
> > Cheers,
> > Max
> >
> > On 12.08.19 19:58, Jan Lukavský wrote:
> >> I've managed to fix that by introducing (optional) method to DoFnRunner
> >> called getSystemStateTags() (default implementation returns
> >> Collection.emptyList()), and the use that list to early bind states in
> >> Flink's DoFnOperator ([1])
> >>
> >> @Max, WDYT?
> >>
> >> Jan
> >>
> >> [1]
> >> https://github.com/je-ik/beam/commit/1360fb6bd35192d443f97068c7ba2155f79e8802
> >>
> >> On 8/12/19 4:00 PM, Jan Lukavský wrote:
> >>> Hi,
> >>>
> >>> I have come across issue that is very much likely caused by [1]. The
> >>> issue is that Beam's state is (generally) created lazily, after
> >>> element is received (as Max described in the Flink's JIRA). Max also
> >>> created workaround [2], but that seems to work for user state only
> >>> (i.e. state that has been created in user code by declaring @StateId -
> >>> please correct me if I'm wrong). In my work, however, I created a
> >>> system state (that holds elements before being output, due to
> >>> @RequiresTimeSortedInput annotation, but that's probably not
> >>> important), and this state is not covered by the workaround. This
> >>> might be the case for all system states, generally, because they are
> >>> not visible, until element arrives and the state is actually created.
> >>>
> >>> Is my analysis correct? If so, would anyone have any suggestions how
> >>> to fix that?
> >>>
> >>> Jan
> >>>
> >>> [1] https://jira.apache.org/jira/browse/FLINK-12653
> >>>
> >>> [2] https://issues.apache.org/jira/browse/BEAM-7144
> >>>



Re: Write-through-cache in State logic

2019-08-13 Thread Maximilian Michels
Agree that we have to be able to flush before a checkpoint to avoid
caching too many elements. Also good point about checkpoint costs
increasing with flushing the cache on checkpoints. A LRU cache policy in
the SDK seems desirable.

What is the role of the cache token in the design document[1]? It looks
to me that the token is used to give the Runner control over which and
how many elements can be cached by the SDK. Why is that necessary?
Shouldn't this be up to the SDK?

-Max

[1]
https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m

Is it simply to
On 12.08.19 19:55, Lukasz Cwik wrote:
> 
> 
> On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise  <mailto:t...@apache.org>> wrote:
> 
> 
> On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels  <mailto:m...@apache.org>> wrote:
> 
> Thanks for starting this discussion Rakesh. An efficient cache
> layer is
> one of the missing pieces for good performance in stateful
> pipelines.
> The good news are that there is a level of caching already
> present in
> Python which batches append requests until the bundle is finished.
> 
> Thomas, in your example indeed we would have to profile to see
> why CPU
> utilization is high on the Flink side but not in the Python SDK
> harness.
> For example, older versions of Flink (<=1.5) have a high cost of
> deleting existing instances of a timer when setting a timer.
> Nevertheless, cross-bundle caching would likely result in increased
> performance.
> 
> 
> CPU on the Flink side was unchanged, and that's important. The
> throughout improvement comes from the extended bundle caching on the
> SDK side. That's what tells me that cross-bundle caching is needed.
> Of course, it will require a good solution for the write also and I
> like your idea of using the checkpoint boundary for that, especially
> since that already aligns with the bundle boundary and is under
> runner control. Of course we also want to be careful to not cause
> overly bursty writes.
> 
> Profiling will be useful for the timer processing, that is also on
> my list of suspects.
> 
> 
> Luke, I think the idea to merge pending state requests could be
> complementary to caching across bundles.
> 
> Question: Couldn't we defer flushing back state from the SDK to the
> Runner indefinitely, provided that we add a way to flush the
> state in
> case of a checkpoint?
> 
> 
> Flushing is needed to prevent the SDK from running out of memory. Having
> a fixed budget for state inside the SDK would have flushing happen under
> certain state usage scenarios.
> I could also see that only flushing at checkpoint may lead to slow
> checkpoint performance so we may want to flush state that hasn't been
> used in a while as well.
>  
> 
> Another performance improvement would be caching read requests
> because
> these first go to the Runner regardless of already cached appends.
> 
> -Max
> 
> On 09.08.19 17:12, Lukasz Cwik wrote:
> >
> >
> > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw
> mailto:rober...@google.com>
> > <mailto:rober...@google.com <mailto:rober...@google.com>>> wrote:
> >
> >     The question is whether the SDK needs to wait for the
> StateResponse to
> >     come back before declaring the bundle done. The proposal
> was to not
> >     send the cache token back as part of an append
> StateResponse [1], but
> >     pre-provide it as part of the bundle request.
> >
> >
> > Agree, the purpose of the I'm Blocked message is to occur
> during bundle
> > processing. 
> >  
> >
> >     Thinking about this some more, if we assume the state
> response was
> >     successfully applied, there's no reason for the SDK to
> block the
> >     bundle until it has its hands on the cache token--we can
> update the
> >     cache once the StateResponse comes back whether or not the
> bundle is
> >     still active. On the other hand, the runner needs a way to
> assert it
> >     has received and processed all StateRequests from the SDK
> associated
> >     with a bundle before it can declare the bundle complete
&

Re: Write-through-cache in State logic

2019-08-13 Thread Maximilian Michels
Thanks for clarifying. Cache-invalidation for side inputs makes sense.

In case the Runner fails to checkpoint, could it not re-attempt the
checkpoint? At least in the case of Flink, the cache would still be
valid until another checkpoint is attempted. For other Runners that may
not be the case. Also, rolling back state while keeping the SDK Harness
running requires to invalidate the cache.

-Max

On 13.08.19 18:09, Lukasz Cwik wrote:
> 
> 
> On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels  <mailto:m...@apache.org>> wrote:
> 
> Agree that we have to be able to flush before a checkpoint to avoid
> caching too many elements. Also good point about checkpoint costs
> increasing with flushing the cache on checkpoints. A LRU cache policy in
> the SDK seems desirable.
> 
> What is the role of the cache token in the design document[1]? It looks
> to me that the token is used to give the Runner control over which and
> how many elements can be cached by the SDK. Why is that necessary?
> Shouldn't this be up to the SDK?
> 
>  
> We want to be able to handle the case where the SDK completes the bundle
> successfully but the runner fails to checkpoint the information.
> We also want the runner to be able to pass in cache tokens for things
> like side inputs which may change over time (and the SDK would not know
> that this happened).
>  
> 
> -Max
> 
> [1]
> 
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> 
> Is it simply to
> On 12.08.19 19:55, Lukasz Cwik wrote:
> >
> >
> > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise  <mailto:t...@apache.org>
> > <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
> >
> >
> >     On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels
> mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >
> >         Thanks for starting this discussion Rakesh. An efficient cache
> >         layer is
> >         one of the missing pieces for good performance in stateful
> >         pipelines.
> >         The good news are that there is a level of caching already
> >         present in
> >         Python which batches append requests until the bundle is
> finished.
> >
> >         Thomas, in your example indeed we would have to profile to see
> >         why CPU
> >         utilization is high on the Flink side but not in the
> Python SDK
> >         harness.
> >         For example, older versions of Flink (<=1.5) have a high
> cost of
> >         deleting existing instances of a timer when setting a timer.
> >         Nevertheless, cross-bundle caching would likely result in
> increased
> >         performance.
> >
> >
> >     CPU on the Flink side was unchanged, and that's important. The
> >     throughout improvement comes from the extended bundle caching
> on the
> >     SDK side. That's what tells me that cross-bundle caching is
> needed.
> >     Of course, it will require a good solution for the write also
> and I
> >     like your idea of using the checkpoint boundary for that,
> especially
> >     since that already aligns with the bundle boundary and is under
> >     runner control. Of course we also want to be careful to not cause
> >     overly bursty writes.
> >
> >     Profiling will be useful for the timer processing, that is also on
> >     my list of suspects.
> >
> >
> >         Luke, I think the idea to merge pending state requests
> could be
> >         complementary to caching across bundles.
> >
> >         Question: Couldn't we defer flushing back state from the
> SDK to the
> >         Runner indefinitely, provided that we add a way to flush the
> >         state in
> >         case of a checkpoint?
> >
> >
> > Flushing is needed to prevent the SDK from running out of memory.
> Having
> > a fixed budget for state inside the SDK would have flushing happen
> under
> > certain state usage scenarios.
> > I could also see that only flushing at checkpoint may lead to slow
> > checkpoint performance so we may want to flush state that hasn't been
> > used in a while as well.
> >  
> >
> >     

Re: jira access

2019-08-14 Thread Maximilian Michels
Hi Sebastian,

Welcome! I've added you as a contributor in JIRA.

Cheers,
Max

On 14.08.19 11:54, Sebastian Jambor wrote:
> Hi,
> 
> I'm Sebastian, working for Trifacta. We use Beam for dataflow jobs in
> Cloud Dataprep. Could someone add me as a contributor to jira? My jira
> id is sgrj.
> 
> Thanks,
> Sebastian


Re: Write-through-cache in State logic

2019-08-14 Thread Maximilian Michels
Yes, that makes sense. What do you think about creating a document to
summarize the ideas presented here? Also, it would be good to capture
the status quo regarding caching in the Python SDK.

-Max

On 13.08.19 22:44, Thomas Weise wrote:
> The token would be needed in general to invalidate the cache when
> bundles are processed by different workers.
> 
> In the case of the Flink runner we don't have a scenario of SDK worker
> surviving the runner in the case of a failure, so there is no
> possibility of inconsistent state as result of a checkpoint failure.
> 
> --
> sent from mobile
> 
> On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels  <mailto:m...@apache.org>> wrote:
> 
> Thanks for clarifying. Cache-invalidation for side inputs makes sense.
> 
> In case the Runner fails to checkpoint, could it not re-attempt the
> checkpoint? At least in the case of Flink, the cache would still be
> valid until another checkpoint is attempted. For other Runners that may
> not be the case. Also, rolling back state while keeping the SDK Harness
> running requires to invalidate the cache.
> 
> -Max
> 
> On 13.08.19 18:09, Lukasz Cwik wrote:
> >
> >
> > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels  <mailto:m...@apache.org>
> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >
> >     Agree that we have to be able to flush before a checkpoint to
> avoid
> >     caching too many elements. Also good point about checkpoint costs
> >     increasing with flushing the cache on checkpoints. A LRU cache
> policy in
> >     the SDK seems desirable.
> >
> >     What is the role of the cache token in the design document[1]?
> It looks
> >     to me that the token is used to give the Runner control over
> which and
> >     how many elements can be cached by the SDK. Why is that necessary?
> >     Shouldn't this be up to the SDK?
> >
> >  
> > We want to be able to handle the case where the SDK completes the
> bundle
> > successfully but the runner fails to checkpoint the information.
> > We also want the runner to be able to pass in cache tokens for things
> > like side inputs which may change over time (and the SDK would not
> know
> > that this happened).
> >  
> >
> >     -Max
> >
> >     [1]
> >   
>  
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >
> >     Is it simply to
> >     On 12.08.19 19:55, Lukasz Cwik wrote:
> >     >
> >     >
> >     > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise
> mailto:t...@apache.org>
> >     <mailto:t...@apache.org <mailto:t...@apache.org>>
> >     > <mailto:t...@apache.org <mailto:t...@apache.org>
> <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
> >     >
> >     >
> >     >     On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels
> >     mailto:m...@apache.org> <mailto:m...@apache.org
> <mailto:m...@apache.org>>
> >     >     <mailto:m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >     >
> >     >         Thanks for starting this discussion Rakesh. An
> efficient cache
> >     >         layer is
> >     >         one of the missing pieces for good performance in
> stateful
> >     >         pipelines.
> >     >         The good news are that there is a level of caching
> already
> >     >         present in
> >     >         Python which batches append requests until the bundle is
> >     finished.
> >     >
> >     >         Thomas, in your example indeed we would have to
> profile to see
> >     >         why CPU
> >     >         utilization is high on the Flink side but not in the
> >     Python SDK
> >     >         harness.
> >     >         For example, older versions of Flink (<=1.5) have a high
> >     cost of
> >     >         deleting existing instances of a timer when setting
> a timer.
> >     >         Nevertheless, cross-bundle caching would likely
> result in
> >     increased
> >     >         performance.
&

Re: Write-through-cache in State logic

2019-08-14 Thread Maximilian Michels
For the purpose of my own understanding of the matter, I've created a
document:
https://docs.google.com/document/d/1ltVqIW0XxUXI6grp17TgeyIybk3-nDF8a0-Nqw-s9mY/


It could make sense to clarify and specify things in there for now. I'm
more than willing to consolidate this document with the caching section
in the Fn API document.

-Max

On 14.08.19 17:13, Lukasz Cwik wrote:
> Instead of starting a new doc, could we add/update the caching segment
> of https://s.apache.org/beam-fn-state-api-and-bundle-processing?
> 
> Everyone has comment access and all Apache Beam PMC can add themselves
> to be editors since the doc is owned by the Apache Beam PMC gmail acocunt.
> 
> On Wed, Aug 14, 2019 at 7:01 AM Maximilian Michels  <mailto:m...@apache.org>> wrote:
> 
> Yes, that makes sense. What do you think about creating a document to
> summarize the ideas presented here? Also, it would be good to capture
> the status quo regarding caching in the Python SDK.
> 
> -Max
> 
> On 13.08.19 22:44, Thomas Weise wrote:
> > The token would be needed in general to invalidate the cache when
> > bundles are processed by different workers.
> >
> > In the case of the Flink runner we don't have a scenario of SDK worker
> > surviving the runner in the case of a failure, so there is no
> > possibility of inconsistent state as result of a checkpoint failure.
> >
> > --
> > sent from mobile
> >
> > On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels  <mailto:m...@apache.org>
> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >
> >     Thanks for clarifying. Cache-invalidation for side inputs
> makes sense.
> >
> >     In case the Runner fails to checkpoint, could it not
> re-attempt the
> >     checkpoint? At least in the case of Flink, the cache would
> still be
> >     valid until another checkpoint is attempted. For other Runners
> that may
> >     not be the case. Also, rolling back state while keeping the
> SDK Harness
> >     running requires to invalidate the cache.
> >
> >     -Max
> >
> >     On 13.08.19 18:09, Lukasz Cwik wrote:
> >     >
> >     >
> >     > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels
> mailto:m...@apache.org>
> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
> >     > <mailto:m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >     >
> >     >     Agree that we have to be able to flush before a
> checkpoint to
> >     avoid
> >     >     caching too many elements. Also good point about
> checkpoint costs
> >     >     increasing with flushing the cache on checkpoints. A LRU
> cache
> >     policy in
> >     >     the SDK seems desirable.
> >     >
> >     >     What is the role of the cache token in the design
> document[1]?
> >     It looks
> >     >     to me that the token is used to give the Runner control over
> >     which and
> >     >     how many elements can be cached by the SDK. Why is that
> necessary?
> >     >     Shouldn't this be up to the SDK?
> >     >
> >     >  
> >     > We want to be able to handle the case where the SDK
> completes the
> >     bundle
> >     > successfully but the runner fails to checkpoint the information.
> >     > We also want the runner to be able to pass in cache tokens
> for things
> >     > like side inputs which may change over time (and the SDK
> would not
> >     know
> >     > that this happened).
> >     >  
> >     >
> >     >     -Max
> >     >
> >     >     [1]
> >     >   
> >   
>   
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
> >     >
> >     >     Is it simply to
> >     >     On 12.08.19 19:55, Lukasz Cwik wrote:
> >     >     >
>     >     >     >
> >     >     > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise
> >     mailto:t...@apache.org> <mailto:t...@apache.org
> <mailto:t...@apache.org>>
> >     >     <mailto:t...@apache.org <mailto:t..

  1   2   3   4   5   6   >