Re: [ANNOUNCE] New committer: Ke Wu

2022-05-31 Thread Xinyu Liu
Congrats!

Xinyu

On Mon, May 30, 2022 at 7:46 AM Evan Galpin  wrote:

> Congrats Ke!
>
> - Evan
>
>
> On Mon, May 30, 2022 at 4:11 AM Jan Lukavský  wrote:
>
>> Congrats Ke!
>>
>>  Jan
>> On 5/29/22 04:12, Yi Pan wrote:
>>
>> Congrats, Ke!
>>
>> -Yi
>>
>> On Sat, May 28, 2022 at 6:57 PM Robert Burke  wrote:
>>
>>> Congratulations!
>>> Another place that runs the Go SDK ;)
>>>
>>> On Fri, May 27, 2022, 3:49 PM Ahmet Altay  wrote:
>>>
 Hi all,

 Please join me and the rest of the Beam PMC in welcoming a new
 committer: Ke Wu (kw2542@)

 Ke has been contributing to Beam since 2020. Ke's contributions are
 mostly focused on the SamzaRunner, as a result of Ke's efforts Beam has a
 fully featured, portable, supported SamzaRunner with happy users!

 Considering these contributions, the Beam PMC trusts Ke with the
 responsibilities of a Beam committer.[1]

 Thank you Ke!

 Ahmet

 [1]
 https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer

>>>


Re: [PROPOSAL] Projection pushdown in Beam Java

2021-08-06 Thread Xinyu Liu
Very happy to see we will have pushdown optimizations for java pipelines!
Thanks for sharing the proposal.

Thanks,
XInyu

On Fri, Aug 6, 2021 at 9:26 AM Alexey Romanenko 
wrote:

> Thanks Kyle, very promising. I left some comments.
>
> —
> Alexey
>
> On 5 Aug 2021, at 19:59, Luke Cwik  wrote:
>
> Thanks, I took a look at it.
>
> On Tue, Aug 3, 2021 at 2:31 PM Kyle Weaver  wrote:
>
>> Hi Beam devs,
>>
>> I'm back with another proposal involving Schema-based optimization in the
>> Beam Java SDK. This one builds on the ideas in my previous proposal and is
>> broader in scope. Please leave comments if this area is of interest to you.
>>
>>
>> https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/edit#
>>
>> Thank you,
>> Kyle
>>
>
>


Re: Question about transformOverride

2021-04-21 Thread Xinyu Liu
Yes, our recommendation to the users is the same as you described, passing
in the inputs as parameters. This fits many use cases. We also have users
wrapping IOs in their own composite transforms and/or having inputs
scattered in different modules/libraries. Passing in inputs doesn't work
well in these use cases, so we are thinking about whether we can provide a
way to override the input transform. From the discussion above, it seems
the current TransformOverride is not intended for this usage. I will take a
closer look at the code to see whether this is achievable (and see whether
we need more dependency from the core). Seems in portable pipeline Fuse can
manipulate the structure of the pipeline, but I am not sure about the java
pipeline.

Thanks,
Xinyu

On Wed, Apr 21, 2021 at 9:40 AM Robert Burke  wrote:

> My general answer for this is to avoid bundling the IOs with the rest of
> the pipeline. Have the Input collection be a parameter to a function that
> constructs the rest of the pipeline, which returns its intended
> PCollections as outputs.
>
> No need to go as far as wrap the whole construction function as a
> Composite, but that's similar.
>
> Runners providing features to make it easier to test the way you describe,
> though does sound very useful, but it does require the runner be aware of
> each transform to be overridden, possibly increasing the runners dependency
> surface.
>
> On Wed, Apr 21, 2021, 9:31 AM Xinyu Liu  wrote:
>
>> @Chamikara: Yuhong and I are working on Samza Runner, and we are looking
>> for a way to swap the transform for ease of use in testing.
>>
>> @Reuven: Your approach will work for this case, but we do have quite a
>> few read transforms here and we have to plug this code in each of time with
>> some testing logic there too. Seems not very clean to me to have testing
>> code mixed with real logic. It will be hard to maintain in the long run if
>> we add more read transforms in the future. It will be much nicer if we can
>> leverage something like TransformOverrides to replace a transform entirely
>> without messing around the existing code.
>>
>> Thanks,
>> Xinyu
>>
>> On Tue, Apr 20, 2021 at 10:00 PM Boyuan Zhang  wrote:
>>
>>> +1 to use pipeline options.
>>>
>>>  Alternatively, you can also change your KafkaReadTransform to perform
>>> different expansion(override expand()) based on your pipeline options.
>>>
>>> On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax  wrote:
>>>
>>>> It would be simpler to create a custom pipeline option, and swap out
>>>> the read transform in your code. For example
>>>>
>>>> PCollection pc;
>>>> if (options.getLocalTest()) {
>>>>   pc = pipeline.apply(new ReadFromLocalFile());
>>>> } else {
>>>>   pc = pipeline.apply(new KafkaReadTrasnform());
>>>> }
>>>>
>>>> pc.apply(/* rest of pipeline */);
>>>>
>>>> On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng 
>>>> wrote:
>>>>
>>>>> We want to support transform override when doing tests locally.  For
>>>>> example, in real pipelines, we read from Kafka, but when doing tests
>>>>> locally, we want to read from a local file to help test whether the
>>>>> pipeline works fine. So we want to override the Kafka read transform
>>>>> directly instead of writing the pipeline twice.
>>>>>
>>>>> code example:
>>>>>
>>>>> public Pipeline createPipeline(Pipeline pipeline) {
>>>>>
>>>>>pipeline.apply(new KafkaReadTransform()).apply(// other
>>>>> functions..);
>>>>> }
>>>>> In test, we will use the same createPipeline() function to create a
>>>>> pipeline but meanwhile we want to override KafkaReadTransform with another
>>>>> transform to avoid reading from Kafka.
>>>>>
>>>>> Thanks,
>>>>> Yuhong
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> In general, TransformOverrides are expected to be per-runner
>>>>>> implementation details and are not expected to be directly used by
>>>>>> end-users.
>>>>>> What is the exact use-case you are trying to achieve ? Are you
>>>>>

Re: Question about transformOverride

2021-04-21 Thread Xinyu Liu
@Chamikara: Yuhong and I are working on Samza Runner, and we are looking
for a way to swap the transform for ease of use in testing.

@Reuven: Your approach will work for this case, but we do have quite a few
read transforms here and we have to plug this code in each of time with
some testing logic there too. Seems not very clean to me to have testing
code mixed with real logic. It will be hard to maintain in the long run if
we add more read transforms in the future. It will be much nicer if we can
leverage something like TransformOverrides to replace a transform entirely
without messing around the existing code.

Thanks,
Xinyu

On Tue, Apr 20, 2021 at 10:00 PM Boyuan Zhang  wrote:

> +1 to use pipeline options.
>
>  Alternatively, you can also change your KafkaReadTransform to perform
> different expansion(override expand()) based on your pipeline options.
>
> On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax  wrote:
>
>> It would be simpler to create a custom pipeline option, and swap out the
>> read transform in your code. For example
>>
>> PCollection pc;
>> if (options.getLocalTest()) {
>>   pc = pipeline.apply(new ReadFromLocalFile());
>> } else {
>>   pc = pipeline.apply(new KafkaReadTrasnform());
>> }
>>
>> pc.apply(/* rest of pipeline */);
>>
>> On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng 
>> wrote:
>>
>>> We want to support transform override when doing tests locally.  For
>>> example, in real pipelines, we read from Kafka, but when doing tests
>>> locally, we want to read from a local file to help test whether the
>>> pipeline works fine. So we want to override the Kafka read transform
>>> directly instead of writing the pipeline twice.
>>>
>>> code example:
>>>
>>> public Pipeline createPipeline(Pipeline pipeline) {
>>>
>>>pipeline.apply(new KafkaReadTransform()).apply(// other functions..);
>>> }
>>> In test, we will use the same createPipeline() function to create a
>>> pipeline but meanwhile we want to override KafkaReadTransform with another
>>> transform to avoid reading from Kafka.
>>>
>>> Thanks,
>>> Yuhong
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath 
>>> wrote:
>>>
 In general, TransformOverrides are expected to be per-runner
 implementation details and are not expected to be directly used by
 end-users.
 What is the exact use-case you are trying to achieve ? Are you running
 into a missing feature of an existing transform ?

 Thanks,
 Cham

 On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng 
 wrote:

> Hi Beam,
> We have a use case when creating a pipeline, we want to replace the IO
> read/write transform when testing using `pipeline.replaceAll(overrides)`.
> However, we met some problems when doing tests:
> 1. Are there any ways we can avoid calling expand() of a transform
> when it is going to be replaced?  The reason we want to override a
> transform is because that the expand() of this transform is somehow not
> available in some situations. It seems not reasonable enough to call the
> expand() of the originalTransform and then call the expand() of the
> overrideTransform again?
> 2. When trying to implement `PTransformOverrideFactory`, we realize
> that the inputs are `TaggedPValue`, which can only make {Tuple,
> PCollection} pairs. Then if we want to override a write transform whose
> output type is `PDone`, what's the best way to implement this factory?
>
>
> Thanks in advance for answers! This is quite important to our
> pipelines.
>
> Thanks,
> Yuhong
>



Re: Python Dataframe API issue

2021-03-25 Thread Xinyu Liu
Np, thanks for quickly identifying the fix.

Btw, I am very happy about Beam Python supporting the same Pandas dataframe
api. It's super user-friendly to both devs and data scientists. Really cool
work!

Thanks,
Xinyu

On Thu, Mar 25, 2021 at 4:53 PM Robert Bradshaw  wrote:

> Thanks, Xinyu, for finding this!
>
> On Thu, Mar 25, 2021 at 4:48 PM Kenneth Knowles  wrote:
>
>> Cloned to https://issues.apache.org/jira/browse/BEAM-12056
>>
>> On Thu, Mar 25, 2021 at 4:46 PM Brian Hulette 
>> wrote:
>>
>>> Yes this looks like https://issues.apache.org/jira/browse/BEAM-11929, I
>>> removed it from the release blockers since there is a workaround (use a
>>> NamedTuple type), but it's probably worth cherrypicking the fix.
>>>
>>> On Thu, Mar 25, 2021 at 4:44 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> This could be https://issues.apache.org/jira/browse/BEAM-11929
>>>>
>>>> On Thu, Mar 25, 2021 at 4:26 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> This is definitely wrong. Looking into what's going on here, but this
>>>>> seems severe enough to be a blocker for the next release.
>>>>>
>>>>> On Thu, Mar 25, 2021 at 3:39 PM Xinyu Liu 
>>>>> wrote:
>>>>>
>>>>>> Hi, folks,
>>>>>>
>>>>>> I am playing around with the Python Dataframe API, and seemly got an
>>>>>> schema issue when converting pcollection to dataframe. I wrote the
>>>>>> following code for a simple test:
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.dataframe.convert import to_dataframe
>>>>>> from apache_beam.dataframe.convert import to_pcollection
>>>>>>
>>>>>> p = beam.Pipeline()
>>>>>> data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(
>>>>>> lambda x : beam.Row(word=x[0], val=x[1]))
>>>>>> _ = data | beam.Map(print)
>>>>>> p.run()
>>>>>>
>>>>>> This shows the following:
>>>>>> Row(val='', word='a') Row(val='', word='b')
>>>>>>
>>>>>> But if I use to_dataframe() to convert it into a df, seems the schema
>>>>>> was reversed:
>>>>>>
>>>>>> df = to_dataframe(data)
>>>>>> dataCopy = to_pcollection(df)
>>>>>> _ = dataCopy | beam.Map(print)
>>>>>> p.run()
>>>>>>
>>>>>> I got:
>>>>>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
>>>>>> BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')
>>>>>>
>>>>>> Seems now the column 'word' and 'val' is swapped. The problem seems
>>>>>> to happen during to_dataframe(). If I print out df['word'], I got ''
>>>>>> and ''. I am not sure whether I am doing something wrong or there is 
>>>>>> an
>>>>>> issue in the schema conversion. Could someone help me take a look?
>>>>>>
>>>>>> Thanks, Xinyu
>>>>>>
>>>>>


Python Dataframe API issue

2021-03-25 Thread Xinyu Liu
Hi, folks,

I am playing around with the Python Dataframe API, and seemly got an schema
issue when converting pcollection to dataframe. I wrote the following code
for a simple test:

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

p = beam.Pipeline()
data = p | beam.Create([('a', ''), ('b', '')]) | beam.Map(lambda x
: beam.Row(word=x[0], val=x[1]))
_ = data | beam.Map(print)
p.run()

This shows the following:
Row(val='', word='a') Row(val='', word='b')

But if I use to_dataframe() to convert it into a df, seems the schema was
reversed:

df = to_dataframe(data)
dataCopy = to_pcollection(df)
_ = dataCopy | beam.Map(print)
p.run()

I got:
BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='a')
BeamSchema_4100b64e_16e9_467d_932e_5fc2e4acaca7(word='', val='b')

Seems now the column 'word' and 'val' is swapped. The problem seems to
happen during to_dataframe(). If I print out df['word'], I got '' and
''. I am not sure whether I am doing something wrong or there is an
issue in the schema conversion. Could someone help me take a look?

Thanks, Xinyu


Re: DoFn @Setup with PipelineOptions

2021-03-02 Thread Xinyu Liu
I created a ticket to track this:
https://issues.apache.org/jira/browse/BEAM-11914. Thanks everyone for the
comments!

Thanks,
Xinyu

On Mon, Mar 1, 2021 at 4:45 PM Xinyu Liu  wrote:

> The reason for not passing it in directly is that we have a large amount
> of configs here at LinkedIn so we use an internal config management
> framework to hide the actual detailed configs needed to construct these
> resources. Internally we put a config map inside the PipelineOptions and
> then during @Setup, we would like to init the config framework with the
> configs inside PipelineOptions. The user does not need to be aware of how
> the configus are populated. They can use something like
>
>   ConfigFramework.create(PipelineOptions).getInstance(SomeResourceFactory)
>
> to create a resource instance they need.
>
> On the other hand, even without this kind of use case, it seems still
> simpler for the users to use parameters in PipelineOptions if we provide it
> directly with @setup.
>
> Thanks,
> Xinyu
>
> On Mon, Mar 1, 2021 at 4:14 PM Kenneth Knowles  wrote:
>
>> Why not just pass in the arguments to the DoFn constructor or as a
>> variable in the containing scope? Do you only know the option after the
>> pipeline is completely constructed so you need to make the switch at
>> runtime? Makes sense. I think passing options to @Setup is useful and
>> harmless.
>>
>> Kenn
>>
>> On Mon, Mar 1, 2021 at 3:42 PM Xinyu Liu  wrote:
>>
>>> Hi, all,
>>>
>>> Currently the @Setup method signature in DoFn does not support any
>>> arguments. This is a bit cumbersome to use for use cases such as creating a
>>> db connection, rest client or fetch some resources, where we would like to
>>> read the configs from the PipelineOptions during setup. Shall we support
>>> adding a DoFn SetupContext that can let the users specify the
>>> PipelineOptions in the arguments, similar to @StartBundle? Seems the
>>> PipelineOptions should always be available when the DoFnRunner is created.
>>> Anyone seeing the downside of it?
>>>
>>> Thanks,
>>> Xinyu
>>>
>>


Re: DoFn @Setup with PipelineOptions

2021-03-01 Thread Xinyu Liu
The reason for not passing it in directly is that we have a large amount of
configs here at LinkedIn so we use an internal config management framework
to hide the actual detailed configs needed to construct these resources.
Internally we put a config map inside the PipelineOptions and then during
@Setup, we would like to init the config framework with the configs inside
PipelineOptions. The user does not need to be aware of how the configus are
populated. They can use something like

  ConfigFramework.create(PipelineOptions).getInstance(SomeResourceFactory)

to create a resource instance they need.

On the other hand, even without this kind of use case, it seems still
simpler for the users to use parameters in PipelineOptions if we provide it
directly with @setup.

Thanks,
Xinyu

On Mon, Mar 1, 2021 at 4:14 PM Kenneth Knowles  wrote:

> Why not just pass in the arguments to the DoFn constructor or as a
> variable in the containing scope? Do you only know the option after the
> pipeline is completely constructed so you need to make the switch at
> runtime? Makes sense. I think passing options to @Setup is useful and
> harmless.
>
> Kenn
>
> On Mon, Mar 1, 2021 at 3:42 PM Xinyu Liu  wrote:
>
>> Hi, all,
>>
>> Currently the @Setup method signature in DoFn does not support any
>> arguments. This is a bit cumbersome to use for use cases such as creating a
>> db connection, rest client or fetch some resources, where we would like to
>> read the configs from the PipelineOptions during setup. Shall we support
>> adding a DoFn SetupContext that can let the users specify the
>> PipelineOptions in the arguments, similar to @StartBundle? Seems the
>> PipelineOptions should always be available when the DoFnRunner is created.
>> Anyone seeing the downside of it?
>>
>> Thanks,
>> Xinyu
>>
>


DoFn @Setup with PipelineOptions

2021-03-01 Thread Xinyu Liu
Hi, all,

Currently the @Setup method signature in DoFn does not support any
arguments. This is a bit cumbersome to use for use cases such as creating a
db connection, rest client or fetch some resources, where we would like to
read the configs from the PipelineOptions during setup. Shall we support
adding a DoFn SetupContext that can let the users specify the
PipelineOptions in the arguments, similar to @StartBundle? Seems the
PipelineOptions should always be available when the DoFnRunner is created.
Anyone seeing the downside of it?

Thanks,
Xinyu


Re: Running Beam pipeline using Spark on YARN

2020-06-23 Thread Xinyu Liu
I am doing some prototyping on this too. I used spark-submit script instead
of the rest api. In my simple setup, I ran SparkJobServerDriver.main()
directly in the AM as a spark job, which will submit the python job to the
default spark master url pointing to "local". I also use --files in the
spark-submit script to upload the python packages and boot script. On the
python side, I was using the following pipeline options for submission
(thanks to Thomas):

pipeline_options = PipelineOptions([

"--runner=PortableRunner",

"--job_endpoint=your-job-server:8099",

"--environment_type=PROCESS",
"--environment_config={\"command\": \"./boot\"}")]

I used my own boot script for customized python packaging. WIth this setup
I was able to get a simple hello-world program running. I haven't tried to
run the job server separately from the AM yet. So hopefully setting
--spark-master-url to be yarn will work too.

Thanks,
Xinyu

On Tue, Jun 23, 2020 at 12:18 PM Kyle Weaver  wrote:

> Hi Kamil, there is a JIRA for this:
> https://issues.apache.org/jira/browse/BEAM-8970 It's theoretically
> possible but remains untested as far as I know :)
>
> As I indicated in a comment, you can set --output_executable_path to
> create a jar that you can then submit to yarn via spark-submit.
>
> If you can get this working, I'd additionally like to script the jar
> submission in python to save users the extra step.
>
> Thanks,
> Kyle
>
> On Tue, Jun 23, 2020 at 9:16 AM Kamil Wasilewski <
> kamil.wasilew...@polidea.com> wrote:
>
>> Hi all,
>>
>> I'm trying to run a Beam pipeline using Spark on YARN. My pipeline is
>> written in Python, so I need to use a portable runner. Does anybody know
>> how I should configure job server parameters, especially
>> --spark-master-url?  Is there anything else I need to be aware of while
>> using such setup?
>>
>> If it makes a difference, I use Google Dataproc.
>>
>> Best,
>> Kamil
>>
>


Re: Running Beam python pipeline on Spark

2020-06-03 Thread Xinyu Liu
Thanks for the pointers, Thomas. Let me give it a shot tomorrow.

Thanks,
Xinyu

On Wed, Jun 3, 2020 at 7:13 PM Thomas Weise  wrote:

> If all Python dependencies are pre-installed on the yarn container hosts,
> then you can use the process environment to spawn processes, like so:
>
>
> https://github.com/lyft/flinkk8soperator/blob/bb8834d69e8621d636ef2085fdc167a9d2c2bfa3/examples/beam-python/src/beam_example/pipeline.py#L16-L17
>
> Thomas
>
>
> On Wed, Jun 3, 2020 at 5:48 PM Xinyu Liu  wrote:
>
>> Hi, folks,
>>
>> I am trying to do some experiment to run a simple "hello world" python
>> pipeline on a remote Spark cluster on Hadoop. So far I ran the
>> SparkJobServerDriver on the Yarn application master and managed to submit a
>> python pipeline to it. SparkPipelineRunner was able to run the portable
>> pipeline and spawn some containers to run it. On the container itself, I
>> don't see the sdk_worker.py getting executed so for ExecutableStage the
>> code throws grpc io exceptions. I am wondering whether there is a way for
>> spark runner to run python worker in the containers of yarn cluster? I
>> don't see any existing code for it, and seems the ports allocated for
>> bundle factory are also arbitrary. Any thoughts?
>>
>> Thanks,
>> Xinyu
>>
>


Running Beam python pipeline on Spark

2020-06-03 Thread Xinyu Liu
Hi, folks,

I am trying to do some experiment to run a simple "hello world" python
pipeline on a remote Spark cluster on Hadoop. So far I ran the
SparkJobServerDriver on the Yarn application master and managed to submit a
python pipeline to it. SparkPipelineRunner was able to run the portable
pipeline and spawn some containers to run it. On the container itself, I
don't see the sdk_worker.py getting executed so for ExecutableStage the
code throws grpc io exceptions. I am wondering whether there is a way for
spark runner to run python worker in the containers of yarn cluster? I
don't see any existing code for it, and seems the ports allocated for
bundle factory are also arbitrary. Any thoughts?

Thanks,
Xinyu


Re: [Discuss] Propose Calcite Vendor Release (1.22.0)

2020-03-05 Thread Xinyu Liu
Thanks, Rui! We've been waiting for the new version of Calcite which has
the fix to unflatten the fields. Seems this version will come with it.

Thanks,
Xinyu

On Thu, Mar 5, 2020 at 12:41 AM Ismaël Mejía  wrote:

> The calcite vote already passed so this is good to go, thanks for
> volunteering Rui.
>
> https://lists.apache.org/thread.html/r4962a4a2bacf481f2ee1064806b78829d96385c2e4a3c0ecb24a55a2%40%3Cdev.calcite.apache.org%3E
>
> On Thu, Mar 5, 2020 at 8:10 AM Kai Jiang  wrote:
> >
> > Thanks, Rui! Big +1 for calcite vendor release (1.22.0)
> > Curious, what's the progress of Calcite 1.22.0 official release? I saw
> Calcite community just passes the vote for 1.22.0 rc3.
> >
> > Best,
> > Kai
> >
> >
> > On Wed, Mar 4, 2020 at 9:24 PM Rui Wang  wrote:
> >>
> >> Hi Community,
> >>
> >> As Calcite is closing to finish their 1.22.0 release, I want to propose
> a Calcite vendor release and I am volunteer to be the release manager.
> >>
> >> I will wait until next Monday(03/09) to kick off the release if there
> is no objection.
> >>
> >>
> >> Best,
> >> Rui Wang
>


Re: Strict timer ordering in Samza and Portable Flink Runners

2019-10-23 Thread Xinyu Liu
Hi, Jan,

Thanks for reporting this. I assigned BEAM-8459
 to myself and will take a
look soon.

Thanks,
Xinyu

On Wed, Oct 23, 2019 at 2:54 AM Jan Lukavský  wrote:

> Hi,
>
> as part of [1] a new set of validatesRunner tests has been introduced.
> These tests (currently marked as category UsesStrictTimerOrdering)
> verify that runners fire timers in increasing timestamp under all
> circumstances. After adding these validatesRunner tests, Samza [2] and
> Portable Flink [3] started to fail these tests. I have created the
> tracking issues for that, because that behavior should be fixed (timers
> in wrong order can cause erratic behavior and/or data loss).
>
> I'm writing to anyone interested in solving these issues.
>
> Cheers,
>
>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>
> [2] https://issues.apache.org/jira/browse/BEAM-8459
>
> [3] https://issues.apache.org/jira/browse/BEAM-8460
>
>


Re: [spark structured streaming runner] merge to master?

2019-10-10 Thread Xinyu Liu
+1 for merging to master. It's going to help a lot for us to try it out,
and also contribute back for the missing features.

Thanks,
Xinyu

On Thu, Oct 10, 2019 at 6:40 AM Alexey Romanenko 
wrote:

> +1 for merging this new runner too (even if it’s not 100% ready for the
> moment) in case if it doesn’t break/fail/affect all other tests and Jenkins
> jobs. I mean, it should be transparent for other Beam components.
>
> Also, since it won’t be officially “released” right after merging, we need
> to clearly warn users that it’s not ready to use in production.
>
> > On 10 Oct 2019, at 15:25, Ryan Skraba  wrote:
> >
> > Merging to master sounds like a really good idea, even if it is not
> > feature-complete yet.
> >
> > It's already a pretty big accomplishment getting it to the current
> > state (great job all!).  Merging it into master would give it a pretty
> > good boost for visibility and encouraging some discussion about where
> > it's going.
> >
> > I don't think there's any question about removing the RDD-based
> > (a.k.a. old/legacy/stable) spark runner yet!
> >
> > All my best, Ryan
> >
> >
> > On Thu, Oct 10, 2019 at 2:47 PM Jean-Baptiste Onofré 
> wrote:
> >>
> >> +1
> >>
> >> As the runner seems almost "equivalent" to the one we have, it makes
> sense.
> >>
> >> Question is: do we keep the "old" spark runner for a while or not (or
> >> just keep on previous version/tag on git) ?
> >>
> >> Regards
> >> JB
> >>
> >> On 10/10/2019 09:39, Etienne Chauchot wrote:
> >>> Hi guys,
> >>>
> >>> You probably know that there has been for several months an work
> >>> developing a new Spark runner based on Spark Structured Streaming
> >>> framework. This work is located in a feature branch here:
> >>> https://github.com/apache/beam/tree/spark-runner_structured-streaming
> >>>
> >>> To attract more contributors and get some user feedback, we think it is
> >>> time to merge it to master. Before doing so, some steps need to be
> >>> achieved:
> >>>
> >>> - finish the work on spark Encoders (that allow to call Beam coders)
> >>> because, right now, the runner is in an unstable state (some transforms
> >>> use the new way of doing ser/de and some use the old one, making a
> >>> pipeline incoherent toward serialization)
> >>>
> >>> - clean history: The history contains commits from November 2018, so
> >>> there is a good amount of work, thus a consequent number of commits.
> >>> They were already squashed but not from September 2019
> >>>
> >>> Regarding status:
> >>>
> >>> - the runner passes 89% of the validates runner tests in batch mode. We
> >>> hope to pass more with the new Encoders
> >>>
> >>> - Streaming mode is barely started (waiting for the multi-aggregations
> >>> support in spark SS framework from the Spark community)
> >>>
> >>> - Runner can execute Nexmark
> >>>
> >>> - Some things are not wired up yet
> >>>
> >>>- Beam Schemas not wired with Spark Schemas
> >>>
> >>>- Optional features of the model not implemented:  state api, timer
> >>> api, splittable doFn api, …
> >>>
> >>> WDYT, can we merge it to master once the 2 steps are done ?
> >>>
> >>> Best
> >>>
> >>> Etienne
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
>
>


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-18 Thread Xinyu Liu
Alexey and Etienne: I'm very happy to join the sync-up meeting. Please
forward the meeting info to me. I am based in California, US and hopefully
the time will work :).

Thanks,
Xinyu

On Wed, Sep 18, 2019 at 6:39 AM Etienne Chauchot 
wrote:

> Hi Xinyu,
>
> Thanks for offering help ! My comments are inline:
>
> Le vendredi 13 septembre 2019 à 12:16 -0700, Xinyu Liu a écrit :
>
> Hi, Etienne,
>
> The slides are very informative! Thanks for sharing the details about how
> the Beam API are mapped into Spark Structural Streaming.
>
>
> Thanks !
>
> We (LinkedIn) are also interested in trying the new SparkRunner to run
> Beam pipeine in batch, and contribute to it too. From my understanding,
> seems the functionality on batch side is mostly complete and covers quite a
> large percentage of the tests (a few missing pieces like state and timer in
> ParDo and SDF).
>
>
> Correct, it passes 89% of the tests, but there is more than SDF, state and
> timer missing, there is also ongoing encoders work that I would like to
> commit/push before merging.
>
> If so, is it possible to merge the new runner sooner into master so it's
> much easier for us to pull it in (we have an internal fork) and contribute
> back?
>
>
> Sure, see my other mail on this thread. As Alexey mentioned, please join
> the sync meeting we have, the more the merrier !
>
>
> Also curious about the scheme part in the runner. Seems we can leverage
> the schema-aware work in PCollection and translate from Beam schema to
> Spark, so it can be optimized in the planner layer. It will be great to
> hear back your plans on that.
>
>
> Well, it is not designed yet but, if you remember my talk, we need to
> store beam windowing information with the data itself, so ending up having
> a dataset . One lead that was discussed is to store it as a
> Spark schema such as this:
>
> 1. field1: binary data for beam windowing information (cannot be mapped to
> fields because beam windowing info is complex structure)
>
> 2. fields of data as defined in the Beam schema if there is one
>
>
> Congrats on this great work!
>
> Thanks !
>
> Best,
>
> Etienne
>
> Thanks,
> Xinyu
>
> On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:
>
> Hello Etienne,
>
> Your slide mentioned that streaming mode development is blocked because
> Spark lacks supporting multiple-aggregations in its streaming mode but
> design is ongoing. Do you have a link or something else to their design
> discussion/doc?
>
>
> -Rui
>
> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot 
> wrote:
>
> Hi Rahul,
> Sure, and great ! Thanks for proposing !
> If you want details, here is the presentation I did 30 mins ago at the
> apachecon. You will find the video on youtube shortly but in the meantime,
> here is my presentation slides.
>
> And here is the structured streaming branch. I'll be happy to review your
> PRs, thanks !
>
> <https://github.com/apache/beam/tree/spark-runner_structured-streaming>
> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>
> Best
> Etienne
>
> Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
>
> Hi Etienne,
>
> I came to know about the work going on in Structured Streaming Spark
> Runner from Apache Beam Wiki - Works in Progress.
> I have contributed to BeamSql earlier. And I am working on supporting
> PCollectionView in BeamSql.
>
> I would love to understand the Runner's side of Apache Beam and
> contribute to the Structured Streaming Spark Runner.
>
> Can you please point me in the right direction?
>
> Thanks,
> Rahul
>
>


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-13 Thread Xinyu Liu
Hi, Etienne,

The slides are very informative! Thanks for sharing the details about how
the Beam API are mapped into Spark Structural Streaming. We (LinkedIn) are
also interested in trying the new SparkRunner to run Beam pipeine in batch,
and contribute to it too. From my understanding, seems the functionality on
batch side is mostly complete and covers quite a large percentage of the
tests (a few missing pieces like state and timer in ParDo and SDF). If so,
is it possible to merge the new runner sooner into master so it's much
easier for us to pull it in (we have an internal fork) and contribute back?

Also curious about the scheme part in the runner. Seems we can leverage the
schema-aware work in PCollection and translate from Beam schema to Spark,
so it can be optimized in the planner layer. It will be great to hear back
your plans on that.

Congrats on this great work!
Thanks,
Xinyu

On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:

> Hello Etienne,
>
> Your slide mentioned that streaming mode development is blocked because
> Spark lacks supporting multiple-aggregations in its streaming mode but
> design is ongoing. Do you have a link or something else to their design
> discussion/doc?
>
>
> -Rui
>
> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot 
> wrote:
>
>> Hi Rahul,
>> Sure, and great ! Thanks for proposing !
>> If you want details, here is the presentation I did 30 mins ago at the
>> apachecon. You will find the video on youtube shortly but in the meantime,
>> here is my presentation slides.
>>
>> And here is the structured streaming branch. I'll be happy to review your
>> PRs, thanks !
>>
>> 
>> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>>
>> Best
>> Etienne
>>
>> Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
>>
>> Hi Etienne,
>>
>> I came to know about the work going on in Structured Streaming Spark
>> Runner from Apache Beam Wiki - Works in Progress.
>> I have contributed to BeamSql earlier. And I am working on supporting
>> PCollectionView in BeamSql.
>>
>> I would love to understand the Runner's side of Apache Beam and
>> contribute to the Structured Streaming Spark Runner.
>>
>> Can you please point me in the right direction?
>>
>> Thanks,
>> Rahul
>>
>>


Re: Integration of python/portable runner tests for Samza runner

2019-04-25 Thread Xinyu Liu
Thanks for the explanation, Max. We will try it out and let you guys know
if we run into any questions.

Thanks,
Xinyu

On Thu, Apr 25, 2019 at 6:28 AM Maximilian Michels  wrote:

>  > - For portable running tests: by looking at the
>  > portableValidatesRunnerTask in flink_job_server.gradle, it seems it's
>  > the same set of Java tests but using portability framework to validate
>  > (submit to job server and run the protable pipeline in a specific
>  > runner). Is my understanding correct?
>
> That's correct.
>
>  > - For python tests: Looks like flinkValidatesRunner is using LOOPBACK
>  > SDK worker type in the tests. Not sure what LOOPBACK does. Is it used
>  > for testing? Currently Samza portable runner supports PROCESS worker.
>
> This avoids building and using containers for this test. We had a number
> of issues with Docker on Jenkins and wanted to lower build time for
> PreCommit tests. Loopback means that an embedded Python environment will
> be started which listens on localhost. It's comparable to Java's
> EmbeddedSdkHarness.
>
> -Max
>
> On 24.04.19 20:10, Xinyu Liu wrote:
> > Thanks for the useful pointers! We are looking forward to integrating
> > both Portable and Python-specific tests for Samza runner. A few
> questions:
> >
> > - For portable running tests: by looking at the
> > portableValidatesRunnerTask in flink_job_server.gradle, it seems it's
> > the same set of Java tests but using portability framework to validate
> > (submit to job server and run the protable pipeline in a specific
> > runner). Is my understanding correct?
> >
> > - For python tests: Looks like flinkValidatesRunner is using LOOPBACK
> > SDK worker type in the tests. Not sure what LOOPBACK does. Is it used
> > for testing? Currently Samza portable runner supports PROCESS worker.
> >
> > Thanks,
> > Xinyu
> >
> >
> >
> > On Wed, Apr 24, 2019 at 2:45 AM Maximilian Michels  > <mailto:m...@apache.org>> wrote:
> >
> >  > If you are interested in portable python pipeline validation, I
> > think
> >  > fn_api_runner_test would also help.
> >
> > Just to note, Ankur mentioned flinkCompatibilityMatrix, that one uses
> > fn_api_runner_test with some tooling on top to bring up the test
> > cluster.
> >
> > On 23.04.19 19:23, Boyuan Zhang wrote:
> >  > Hi Daniel,
> >  > If you are interested in portable python pipeline validation, I
> > think
> >  > fn_api_runner_test
> >  >
> > <
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
> >
> >
> >  > would also help.
> >  >
> >  > On Tue, Apr 23, 2019 at 10:19 AM Pablo Estrada
> > mailto:pabl...@google.com>
> >  > <mailto:pabl...@google.com <mailto:pabl...@google.com>>> wrote:
> >  >
> >  > This is cool, Daniel : ) Glad to see the Samza runner moving
> > forward.
> >  > Best
> >  > -P.
> >  >
> >  > On Tue, Apr 23, 2019 at 2:52 AM Maximilian Michels
> > mailto:m...@apache.org>
> >  > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >  >
> >  > Hi Daniel,
> >  >
> >  > Note that there is also Portable Validates Runner which
> > runs Java
> >  > portability tests. I don't know if you have integrated
> > with that
> >  > one
> >  > already.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 23.04.19 02:28, Ankur Goenka wrote:
> >  >  > Hi Daniel,
> >  >  >
> >  >  > We use flinkCompatibilityMatrix [1] to check the Flink
> >  > compatibility
> >  >  > with python. This is python equivalent to
> validatesRunner
> >  > tests in java
> >  >  > for portable runners.
> >  >  > I think we can reuse it for Samza Portable runner with
> > minor
> >  > refactoring.
> >  >  >
> >  >  > [1]
> >  >  >
> >  >
> >
> https://github.com/apache/beam/blob/bdb1a713a120a887e71e85c77879dc4446a58541/sdks/python/build.gradle#L305
> >  >  >
&g

Re: Integration of python/portable runner tests for Samza runner

2019-04-24 Thread Xinyu Liu
Thanks for the useful pointers! We are looking forward to integrating both
Portable and Python-specific tests for Samza runner. A few questions:

- For portable running tests: by looking at the portableValidatesRunnerTask in
flink_job_server.gradle, it seems it's the same set of Java tests but using
portability framework to validate (submit to job server and run the
protable pipeline in a specific runner). Is my understanding correct?

- For python tests: Looks like flinkValidatesRunner is using LOOPBACK SDK
worker type in the tests. Not sure what LOOPBACK does. Is it used for
testing? Currently Samza portable runner supports PROCESS worker.

Thanks,
Xinyu



On Wed, Apr 24, 2019 at 2:45 AM Maximilian Michels  wrote:

> > If you are interested in portable python pipeline validation, I think
> > fn_api_runner_test would also help.
>
> Just to note, Ankur mentioned flinkCompatibilityMatrix, that one uses
> fn_api_runner_test with some tooling on top to bring up the test cluster.
>
> On 23.04.19 19:23, Boyuan Zhang wrote:
> > Hi Daniel,
> > If you are interested in portable python pipeline validation, I think
> > fn_api_runner_test
> > <
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py>
>
> > would also help.
> >
> > On Tue, Apr 23, 2019 at 10:19 AM Pablo Estrada  > > wrote:
> >
> > This is cool, Daniel : ) Glad to see the Samza runner moving forward.
> > Best
> > -P.
> >
> > On Tue, Apr 23, 2019 at 2:52 AM Maximilian Michels  > > wrote:
> >
> > Hi Daniel,
> >
> > Note that there is also Portable Validates Runner which runs Java
> > portability tests. I don't know if you have integrated with that
> > one
> > already.
> >
> > Thanks,
> > Max
> >
> > On 23.04.19 02:28, Ankur Goenka wrote:
> >  > Hi Daniel,
> >  >
> >  > We use flinkCompatibilityMatrix [1] to check the Flink
> > compatibility
> >  > with python. This is python equivalent to validatesRunner
> > tests in java
> >  > for portable runners.
> >  > I think we can reuse it for Samza Portable runner with minor
> > refactoring.
> >  >
> >  > [1]
> >  >
> >
> https://github.com/apache/beam/blob/bdb1a713a120a887e71e85c77879dc4446a58541/sdks/python/build.gradle#L305
> >  >
> >  > On Mon, Apr 22, 2019 at 3:21 PM Daniel Chen
> > mailto:danx...@gmail.com>
> >  > >> wrote:
> >  >
> >  > Hi everyone,
> >  >
> >  > I'm working on improving the validation of the Python
> > portable Samza
> >  > runner. For java, we have the gradle task (
> > :validatesRunner) that
> >  > runs the runner validation tests.
> >  > I am looking for pointers on how to similarly
> > integrate/enable the
> >  > portability and Python tests for the Samza runner.
> >  >
> >  > Any help will be greatly appreciated.
> >  >
> >  > Thanks,
> >  > Daniel
> >  >
> >
>


Re: New contributor

2019-01-30 Thread Xinyu Liu
Welcome and glad to see you here, Tao!

Xinyu

On Wed, Jan 30, 2019 at 12:00 PM Kenneth Knowles  wrote:

> Done. Welcome!
>
> Kenn
>
> On Wed, Jan 30, 2019 at 11:53 AM Tao Feng  wrote:
>
>> Hi,
>>
>> I would like to contribute to beam and work on some tickets in my spare
>> time. Could you please add me to the beam project in JIRA?
>>
>> My jira user name is TaoFeng.
>>
>> Thanks,
>> -Tao
>>
>


Re: [DISCUSSION] ParDo Async Java API

2019-01-30 Thread Xinyu Liu
Does "first class" mean that it is a
> well-encapsulated abstraction? or does it mean that the user can more or
> less do whatever they want? These are opposite but both valid meanings for
> "first class", to me.
> >>>>> >>
> >>>>> >> I would not want to encourage users to do explicit multi-threaded
> programming or control parallelism. Part of the point of Beam is to gain
> big data parallelism without explicit multithreading. I see asynchronous
> chaining of futures (or their best-approximation in your language of
> choice) as a highly disciplined way of doing asynchronous dependency-driven
> computation that is nonetheless conceptually, and readably, straight-line
> code. Threads are not required nor the only way to execute this code. In
> fact you might often want to execute without threading for a reference
> implementation to provide canonically correct results. APIs that leak
> lower-level details of threads are asking for trouble.
> >>>>> >>
> >>>>> >> One of our other ideas was to provide a dynamic parameter of type
> ExecutorService. The SDK harness (pre-portability: the runner) would
> control and observe parallelism while the user could simply register tasks.
> Providing a future/promise API is even more disciplined.
> >>>>> >>
> >>>>> >> Kenn
> >>>>> >>
> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner 
> wrote:
> >>>>> >>>
> >>>>> >>> A related question is how to make execution observable such that
> a runner can make proper scaling decisions. Runners decide how to schedule
> bundles within and across multiple worker instances, and can use
> information about execution to make dynamic scaling decisions. First-class
> async APIs seem like they would encourage DoFn authors to implement their
> own parallelization, rather than deferring to the runner that should be
> more capable of providing the right level of parallelism.
> >>>>> >>>
> >>>>> >>> In the Dataflow worker harness, we estimate execution time to
> PTransform steps by sampling execution time on the execution thread and
> attributing it to the currently invoked method. This approach is fairly
> simple and possible because we assume that execution happens within the
> thread controlled by the runner. Some DoFn's already implement their own
> async logic and break this assumption; I would expect more if we make async
> built into the DoFn APIs.
> >>>>> >>>
> >>>>> >>> So: this isn't an argument against async APIs, but rather: does
> this break execution observability, and are there other lightweight
> mechanisms for attributing execution time of async work?
> >>>>> >>>
> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles 
> wrote:
> >>>>> >>>>
> >>>>> >>>> When executed over the portable APIs, it will be primarily the
> Java SDK harness that makes all of these decisions. If we wanted runners to
> have some insight into it we would have to add it to the Beam model protos.
> I don't have any suggestions there, so I would leave it out of this
> discussion until there's good ideas. We could learn a lot by trying it out
> just in the SDK harness.
> >>>>> >>>>
> >>>>> >>>> Kenn
> >>>>> >>>>
> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <
> xinyuliu...@gmail.com> wrote:
> >>>>> >>>>>
> >>>>> >>>>> I don't have a strong opinion on the resolution of the futures
> regarding to @FinishBundle invocation. Leaving it to be unspecified does
> give runners more room to implement it with their own support.
> >>>>> >>>>>
> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty
> complex to me too if we need to find a way to chain the resulting future
> into the next transform, or leave the async transform as a standalone stage
> initially?
> >>>>> >>>>>
> >>>>> >>>>> Btw, I was counting the number of replies before we hit the
> portability. Seems after 4 replies fuse finally showed up :).
> >>>>> >>>>>
> >>>>> >>>>> Thanks,
> >>>>> >>>>> Xinyu
> >>>>> >>>>>
> >&

Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
I don't have a strong opinion on the resolution of the futures regarding
to @FinishBundle invocation. Leaving it to be unspecified does give runners
more room to implement it with their own support.

Optimization is also another great point. Fuse seems pretty complex to me
too if we need to find a way to chain the resulting future into the next
transform, or leave the async transform as a standalone stage initially?

Btw, I was counting the number of replies before we hit the portability.
Seems after 4 replies fuse finally showed up :).

Thanks,
Xinyu


On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles  wrote:

>
>
> On Tue, Jan 22, 2019, 17:23 Reuven Lax 
>>
>>
>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:
>>
>>> @Steve: it's good to see that this is going to be useful in your use
>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>> implementation that waiting for the future completion is part of the
>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>> async support so the user-level code won't need to implement this logic,
>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>> provides a ResultFuture similar to the API we discussed.
>>>
>>
>> Can this be done correctly? What I mean is that if the process dies, can
>> you guarantee that no data is lost? Beam currently guarantees this for
>> FinishBundle, but if you use an arbitrary async framework this might not be
>> true.
>>
>
> What a Beam runner guarantees is that *if* the bundle is committed, *then*
> finishbundle has run. So it seems just as easy to say *if* a bundle is
> committed, *then* every async result has been resolved.
>
> If the process dies the two cases should be naturally analogous.
>
> But it raises the question of whether they should be resolved prior to
> finishbundle, after, or unspecified. I lean toward unspecified.
>
> That's for a single ParDo. Where this could get complex is optimizing
> fused stages for greater asynchrony.
>
> Kenn
>
>
>>
>>> A simple use case for this is to execute a Runnable asynchronously in
>>> user's own executor. The following code illustrates Kenn's option #2, with
>>> a very simple single-thread pool being the executor:
>>>
>>> new DoFn() {
>>>   @ProcessElement
>>>   public void process(@Element InputT element, @Output 
>>> OutputReceiver> outputReceiver) {
>>> CompletableFuture future = CompletableFuture.supplyAsync(
>>> () -> someOutput,
>>> Executors.newSingleThreadExecutor());
>>> outputReceiver.output(future);
>>>   }
>>> }
>>>
>>> The neat thing about this API is that the user can choose their own async 
>>> framework and we only expect the output to be a CompletionStage.
>>>
>>>
>>> For the implementation of bundling, can we compose a CompletableFuture from 
>>> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
>>> invoke @FinishBundle when this future is complete? Seems this might work.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>
>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
>>> wrote:
>>>
>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>> InputT element, @Output OutputReceiver>).  I
>>>> don't know if there's much benefit to passing a future in, since the
>>>> framework itself could hook up the process function to complete when the
>>>> future completes.
>>>>
>>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>>> correctly can be tricky.
>>>>
>>>> [1]
>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>
>>

Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
I can speak on Samza's perspective: Samza only commits the messages once
the async callbacks have been completed. So if there are any failures, it
will recover from last checkpoint and reprocess the messages that we
haven't got the completion. So there is no data lost. The "Guaranteed
Semantics" in [1] has a little bit more details. I believe Flink honors the
same semantics by reading the "Fault Tolerance Guarantees" section in [2].

Thanks,
Xinyu

[1]:
https://samza.apache.org/learn/tutorials/0.11/samza-async-user-guide.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 5:23 PM Reuven Lax  wrote:

>
>
> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:
>
>> @Steve: it's good to see that this is going to be useful in your use
>> cases as well. Thanks for sharing the code from Scio! I can see in your
>> implementation that waiting for the future completion is part of the
>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>> async support so the user-level code won't need to implement this logic,
>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>> after future completion[1], and Flink also has AsyncFunction api [2] which
>> provides a ResultFuture similar to the API we discussed.
>>
>
> Can this be done correctly? What I mean is that if the process dies, can
> you guarantee that no data is lost? Beam currently guarantees this for
> FinishBundle, but if you use an arbitrary async framework this might not be
> true.
>
>
>> A simple use case for this is to execute a Runnable asynchronously in
>> user's own executor. The following code illustrates Kenn's option #2, with
>> a very simple single-thread pool being the executor:
>>
>> new DoFn() {
>>   @ProcessElement
>>   public void process(@Element InputT element, @Output 
>> OutputReceiver> outputReceiver) {
>> CompletableFuture future = CompletableFuture.supplyAsync(
>> () -> someOutput,
>> Executors.newSingleThreadExecutor());
>> outputReceiver.output(future);
>>   }
>> }
>>
>> The neat thing about this API is that the user can choose their own async 
>> framework and we only expect the output to be a CompletionStage.
>>
>>
>> For the implementation of bundling, can we compose a CompletableFuture from 
>> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
>> invoke @FinishBundle when this future is complete? Seems this might work.
>>
>> Thanks,
>> Xinyu
>>
>>
>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>
>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
>> wrote:
>>
>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>> InputT element, @Output OutputReceiver>).  I
>>> don't know if there's much benefit to passing a future in, since the
>>> framework itself could hook up the process function to complete when the
>>> future completes.
>>>
>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>> correctly can be tricky.
>>>
>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>
>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:
>>>
>>>> If the input is a CompletionStage then the output should also
>>>> be a CompletionStage, since all you should do is async chaining.
>>>> We could enforce this by giving the DoFn an
>>>> OutputReceiver(CompletionStage).
>>>>
>>>> Another possibility that might be even more robust against poor future
>>>> use could be process(@Element InputT element, @Output
>>>> OutputReceiver>). In this way, the process method
>>>> itself will be async chained, rather than counting on the user to do the
>>>> right thing.
>>>>
>>>> We should see how these look in real use cases. The way that processing
>>>> is spl

Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
@Steve: it's good to see that this is going to be useful in your use cases
as well. Thanks for sharing the code from Scio! I can see in your
implementation that waiting for the future completion is part of the
@FinishBundle. We are thinking of taking advantage of the underlying runner
async support so the user-level code won't need to implement this logic,
e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
after future completion[1], and Flink also has AsyncFunction api [2] which
provides a ResultFuture similar to the API we discussed.

A simple use case for this is to execute a Runnable asynchronously in
user's own executor. The following code illustrates Kenn's option #2, with
a very simple single-thread pool being the executor:

new DoFn() {
  @ProcessElement
  public void process(@Element InputT element, @Output
OutputReceiver> outputReceiver) {
CompletableFuture future = CompletableFuture.supplyAsync(
() -> someOutput,
Executors.newSingleThreadExecutor());
outputReceiver.output(future);
  }
}

The neat thing about this API is that the user can choose their own
async framework and we only expect the output to be a CompletionStage.


For the implementation of bundling, can we compose a CompletableFuture
from each element in the bundle, e.g. CompletableFuture.allOf(...),
and then invoke @FinishBundle when this future is complete? Seems this
might work.

Thanks,
Xinyu


[1]
https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz  wrote:

> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver>).  I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>
> I feel like I've spent a bunch of time writing very similar "kick off a
> future in ProcessElement, join it in FinishBundle" code, and looking around
> beam itself a lot of built-in transforms do it as well.  Scio provides a
> few AsyncDoFn implementations [1] but it'd be great to see this as a
> first-class concept in beam itself.  Doing error handling, concurrency, etc
> correctly can be tricky.
>
> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>
> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:
>
>> If the input is a CompletionStage then the output should also be
>> a CompletionStage, since all you should do is async chaining. We
>> could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage).
>>
>> Another possibility that might be even more robust against poor future
>> use could be process(@Element InputT element, @Output
>> OutputReceiver>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>>
>> We should see how these look in real use cases. The way that processing
>> is split between @ProcessElement and @FinishBundle might complicate things.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu  wrote:
>>
>>> Hi, guys,
>>>
>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>> asks for an asynchronous processing API. There are a few reasons for these
>>> asks:
>>>
>>>- The users here are experienced in asynchronous programming. With
>>>async frameworks such as Netty and ParSeq and libs like async jersey
>>>client, they are able to make remote calls efficiently and the libraries
>>>help manage the execution threads underneath. Async remote calls are very
>>>common in most of our streaming applications today.
>>>- Many jobs are running on a multi-tenancy cluster. Async processing
>>>helps for less resource usage and fast computation (less context switch).
>>>
>>> I asked about the async support in a previous email thread. The
>>> following API was mentioned in the reply:
>>>
>>>   new DoFn() {
>>> @ProcessElement
>>> public void process(@Element CompletionStage element, ...) {
>>>   element.thenApply(...)
>>> }
>>>   }
>>>
>>> We are wondering whether there are any discussions on this API and
>>> related docs. It is awesome that you guys already considered having DoFn to
>>> process asynchronously. Out of curiosity, this API seems to create a
>>> CompletionState out of the input element (probably using framework's
>>> executor) and then allow user to chain on it. To us, it seems more
>>> convenient if the DoFn output a CompletionStage or pass in a
>>> CompletionStage to invoke upon completion.
>>>
>>> We would like to discuss further on the async API and hopefully we will
>>> have a great support in Beam. Really appreciate the feedback!
>>>
>>> Thanks,
>>> Xinyu
>>>
>>


[DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
Hi, guys,

As more users try out Beam running on the SamzaRunner, we got a lot of asks
for an asynchronous processing API. There are a few reasons for these asks:

   - The users here are experienced in asynchronous programming. With async
   frameworks such as Netty and ParSeq and libs like async jersey client, they
   are able to make remote calls efficiently and the libraries help manage the
   execution threads underneath. Async remote calls are very common in most of
   our streaming applications today.
   - Many jobs are running on a multi-tenancy cluster. Async processing
   helps for less resource usage and fast computation (less context switch).

I asked about the async support in a previous email thread. The following
API was mentioned in the reply:

  new DoFn() {
@ProcessElement
public void process(@Element CompletionStage element, ...) {
  element.thenApply(...)
}
  }

We are wondering whether there are any discussions on this API and related
docs. It is awesome that you guys already considered having DoFn to process
asynchronously. Out of curiosity, this API seems to create a
CompletionState out of the input element (probably using framework's
executor) and then allow user to chain on it. To us, it seems more
convenient if the DoFn output a CompletionStage or pass in a
CompletionStage to invoke upon completion.

We would like to discuss further on the async API and hopefully we will
have a great support in Beam. Really appreciate the feedback!

Thanks,
Xinyu


Re: [DISCUSS] Structuring Java based DSLs

2018-12-12 Thread Xinyu Liu
Agree with Kenn on this. From our SamzaRunner point of view, we would like
Beam SQL to be self-contained and flexible enough for our users to use it
in different scenarios, e.g. pure SQL and embeded in different SDKs. We are
also extremely interested in the DataFrame-like API mentioned above. To
digress a little bit from this topic, this is actually the current hurdle
of letting our users try it out in hadoop since they expect such kind of
API with columnar data set IO support, e.g. ORC. If there are any more
details about the status of DF API and columnar support, I will be very
happy to learn more about it.

Thanks,
Xinyu

On Wed, Dec 12, 2018 at 8:55 AM Jan Lukavský  wrote:

> Hi all,
>
> after letting this sink for a while, I'd like to summarize the feedback
> and emphasize some questions that appeared:
>
>  a) there were several 'it makes sense' opinions
>
>  b) there was one 'not right now' - which makes sense, but the purpose of
> this discussion was to try to first answer the what and then the when :-)
>
>  c) there were several 'maybe, but':
>
>   i) it would be more complicated to code SQL against user-facing API,
> because that way, each change needed by SQL would have to be first
> implemented in this user-friendly API layer
>
>  I can absolutely agree with this, it would be definitely more
> complicated and more work. I see basically two ways out. The first one
> would suggest to move all the code from Euphoria into something similar to
> Join library, and let Euphoria be just the user-friendly layer on top of
> this library (basically just the builders). That way, we could reuse the
> code and be pretty much sure, that the implementation of SQL transforms are
> identical to what Euphoria would offer, which is one the goals of this
> discussion. The drawback would be, that there would be no guaranties, that
> what this underlying library would offer would be also accessible from
> Euphoria - that is because the complexity would not disappear, it would be
> just moved onto different component - new added feature to the shared
> library would have to be made accessible in Euphoria. The other way around
> would be to accept this added complexity in favor of making sure, that
> every feature that is needed by SQL is also available in Euphoria, because
> the user-facing API would be used by SQL itself. I'd really like to further
> hear community opinions on pros and cons of these two (or maybe I'm
> overlooking something and there is a third way).
>
>  ii) in some cases, we might want to support relational operators in SDK
> harness for performance, and we don't want to close doors for this
>
>  Again, the motivation of this seems to be clear and valid, but the
> question that arises is - under the conditions (something like we have
> schema aware PCollection), would we want to enable code reuse between logic
> written in SQL and Euphoria to ensure consistent behavior? That would
> probably mean that Euphoria would have to make use of the provided scheme
> of PCollection and switch to a different behavior on API level (more
> DataFrame-like) and/or different operators created and passed to the SDK
> harness. This feature is currently absolutely missing, but seems to be
> plausible and maybe there could be benefits for both sides.
>
> Many thanks for any more opinions on this.
>
>  Jan
>
>
> On 12/4/18 11:32 PM, Rui Wang wrote:
>
> For pure SQL users, there shouldn't be a SDK concepts. SQL shell and JDBC
> driver should be the way to interact Beam by SQL.
>
>
> For embedded SQL use case in all SDKs (Python, Go, etc.), even assume
> there are relational algebra operators defined in SDKs, SDKs still have to
> implement its own way to parse SQL into operators (SQL is just a string).
> To avoid that overhead, I would imagine that SDKs should keep SQL queries
> and wait for a later but shared processing (I don't know if Portability
> should handle SQL or if it could).
>
>
> -Rui
>
> On Tue, Dec 4, 2018 at 2:04 AM Jan Lukavský  wrote:
>
>> Hi Kenn,
>>
>> my intent really was not to propose any changes right now. I'm trying to
>> create a clear understanding about what the relation between Euphoria and
>> SQL should be in long run. In my point of view, Euphoria should be always
>> superset of SQL, because it should support complete relational algebra (and
>> I'm not saying it does so right now, it should just be our goal) plus more
>> flexible UDFs (not limited to SQL standard) and stateful processing (which
>> will probably not be part of SQL any time soon). There should be some sort
>> of guaranties that the semantics of SQL and Euphoria are the same, because
>> that is what users would expect it to be. This can be for sure ensured by
>> introducing another layer between Euphoria and core SDK (e.g. the join
>> library), but the question is - what makes this solution different from
>> creating this shared library from Euphoria itself (when looking at the big
>> picture)? And it is not only about 

Re: Bay Area Apache Beam Kickoff!

2018-11-21 Thread Xinyu Liu
This is awesome! Glad to learn the latest Beam SQL and meet you guys there.

Thanks,
Xinyu

On Tue, Nov 20, 2018 at 9:07 PM Jean-Baptiste Onofré 
wrote:

> Nice !!
>
> Unfortunately I won't be able to be there. But good luck and I'm sure it
> will be a great meetup !
>
> Regards
> JB
>
> On 20/11/2018 02:36, Austin Bennett wrote:
> > We have our first meetup scheduled for December 12th in San Francisco.
> >
> > Andrew Pilloud, a software engineer at Google and Beam committer, will
> > demo the latest feature in Beam SQL: a standalone SQL shell. The talk
> > cover why SQL is a good fit for streaming data processing, the technical
> > details of the Beam SQL engine, and a peek into our future plans.
> >
> > Kenn Knowles, a founding PMC Member and incoming PMC Chair for the
> > Apache Beam project, as well as computer scientist and engineer at
> > Google will share about all things Beam. Where it is, where its been,
> > where its going.
> >
> > More info:
> >  https://www.meetup.com/San-Francisco-Apache-Beam/events/256348972/
> >
> > For those in/around town (or that can be) come join in the fun!
> >
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Performance of BeamFnData between Python and Java

2018-11-09 Thread Xinyu Liu
Really appreciate the pointers! Looks like the next step is to try
increasing our bundle size. We will do some experiments on our side and
report back later.

@Robert: thanks a lot for the details on protobuf. It was pretty surprising
to us that decoding protobuf messages slows down the performance a lot. For
your questions, we are not using the default docker container, since here
we need to use LinkedIn python packaging and deployment. AFAK it uses
Cython to compile the code.

Thanks,
Xinyu

On Thu, Nov 8, 2018 at 3:11 PM Robert Bradshaw  wrote:

> I'd assume you're compiling the code with Cython as well? (If you're
> using the default containers, that should be fine.)
> On Fri, Nov 9, 2018 at 12:09 AM Robert Bradshaw 
> wrote:
> >
> > Very cool to hear of this progress on Samza!
> >
> > Python protocol buffers are extraordinarily slow (lots of reflection,
> > attributes lookups, and bit fiddling for serialization/deserialization
> > that is certainly not Python's strong point). Each bundle processed
> > involves multiple protos being constructed and sent/received (notably
> > the particularly nested and branchy monitoring info one). While there
> > are still some improvements that could be made for making bundles
> > lighter-weight, amortizing this cost over many elements is essential
> > for performance. (Note that elements within a bundle are packed into a
> > single byte buffer, so avoid this overhead.)
> >
> > Also, it may be good to guarantee you're at least using the C++
> > bindings:
> https://developers.google.com/protocol-buffers/docs/reference/python-generated
> > (still slow, but not as slow).
> >
> > And, of course, due to the GIL one may want many python workers for
> > multi-core machines.
> >
> > On Thu, Nov 8, 2018 at 9:18 PM Thomas Weise  wrote:
> > >
> > > We have been doing some end to end testing with Python and Flink
> (streaming). You could take a look at the following and possibly replicate
> it for your work:
> > >
> > >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py
> > >
> > > We found that in order to get acceptable performance, we need larger
> bundles (we started with single element bundles). Default in the Flink
> runner now is to cap bundles at 1000 elements or 1 second, whatever comes
> first. With that, I have seen decent throughput for the pipeline above (~
> 5000k elements per second with single SDK worker).
> > >
> > > The Flink runner also has support to run multiple SDK workers per
> Flink task manager.
> > >
> > > Thomas
> > >
> > >
> > > On Thu, Nov 8, 2018 at 11:13 AM Xinyu Liu 
> wrote:
> > >>
> > >> 19mb/s throughput is enough for us. Seems the bottleneck is the rate
> of RPC calls. Our message size is usually 1k ~ 10k. So if we can reach
> 19mb/s, we will be able to process ~4k qps, that meets our requirements. I
> guess increasing the size of the bundles will help. Do you guys have any
> results from running python with Flink? We are curious about the
> performance there.
> > >>
> > >> Thanks,
> > >> Xinyu
> > >>
> > >> On Thu, Nov 8, 2018 at 10:13 AM Lukasz Cwik  wrote:
> > >>>
> > >>> This benchmark[1] shows that Python is getting about 19mb/s.
> > >>>
> > >>> Yes, running more python sdk_worker processes will improve
> performance since Python is limited to a single CPU core.
> > >>>
> > >>> [1]
> https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584=490377658=1286539696
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Nov 7, 2018 at 5:24 PM Xinyu Liu 
> wrote:
> > >>>>
> > >>>> By looking at the gRPC dashboard published by the benchmark[1], it
> seems the streaming ping-pong operations per second for gRPC in python is
> around 2k ~ 3k qps. This seems quite low compared to gRPC performance in
> other languages, e.g. 600k qps for Java and Go. Is it expected to run
> multiple sdk_worker processes to improve performance?
> > >>>>
> > >>>> [1]
> https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584=713624174=1012810333
> > >>>>
> > >>>> On Wed, Nov 7, 2018 at 11:14 AM Lukasz Cwik 
> wrote:
> > >>>>>
> > >>>>> gRPC folks provide a bunch of benchmarks for different scenarios:
> https://grpc.io/docs/guides/benchmarking.html
> > >>&

Re: Performance of BeamFnData between Python and Java

2018-11-08 Thread Xinyu Liu
By looking at the gRPC dashboard published by the benchmark[1], it seems
the streaming ping-pong operations per second for gRPC in python is around
2k ~ 3k qps. This seems quite low compared to gRPC performance in other
languages, e.g. 600k qps for Java and Go. Is it expected to run multiple
sdk_worker processes to improve performance?

[1]
https://performance-dot-grpc-testing.appspot.com/explore?dashboard=5652536396611584=713624174=1012810333

On Wed, Nov 7, 2018 at 11:14 AM Lukasz Cwik  wrote:

> gRPC folks provide a bunch of benchmarks for different scenarios:
> https://grpc.io/docs/guides/benchmarking.html
> You would be most interested in the streaming throughput benchmarks since
> the Data API is written on top of the gRPC streaming APIs.
>
> 200KB/s does seem pretty small. Have you captured any Python profiles[1]
> and looked at them?
>
> 1:
> https://lists.apache.org/thread.html/f8488faede96c65906216c6b4bc521385abeddc1578c99b85937d2f2@%3Cdev.beam.apache.org%3E
>
>
> On Wed, Nov 7, 2018 at 10:18 AM Hai Lu  wrote:
>
>> Hi,
>>
>> This is Hai from LinkedIn. I'm currently working on Portable API for
>> Samza Runner. I was able to make Python work with Samza container reading
>> from Kafka. However, I'm seeing severe performance issue with my set up,
>> achieving only ~200KB throughput between the Samza runner in the Java side
>> and the sdk_worker in the Python part.
>>
>> While I'm digging into this, I wonder whether some one has benchmarked
>> the data channel between Java and Python and had some results how much
>> throughput can be reached? Assuming single worker thread and single
>> JobBundleFactory.
>>
>> I might be missing some very basic and naive gRPC setting which leads to
>> this unsatisfactory results. So another question is whether are any good
>> articles or documentations about gRPC tuning dedicated to IPC?
>>
>> Thanks,
>> Hai
>>
>>
>>


Re: Beam Samza Runner status update

2018-10-12 Thread Xinyu Liu
@Max: absolutely we should work together! FlinkRunner has been our best
reference since the start of our SamzaRunner, and the previous work in
Flink portable runner has been extremely valuable to us too. We haven't got
to the point of portable stateful processing yet. Our next step is to hook
up a streaming source, i.e. Kafka, and test out streaming capabilities such
as watermarks, windowing and triggers. For us, reading from Kafka will
happen on the Java side (we have quite a lot of extensions of Kafka in
LinkedIn), so we will try to create some internal Python API and do the
translation right now. On the other hand, we are following up with
BEAM-2937 for the optimization of Combine in portable runner. Previously we
run into some state problem without this being resolved. Anyway, look
forward to syncing up with you more!

Thanks,
Xinyu



On Fri, Oct 12, 2018 at 1:40 AM Maximilian Michels  wrote:

> Thanks for the updating, Xinyu and Hai! Great to see another Running
> emerging :)
>
> I'm on the FlinkRunner. Looking forward to working together with you to
> make the Beam Runners even better. Particularly, we should sync on the
> portability, as some things are still to be fleshed out. In Flink, we
> are starting to integrate portable State.
>
> Best,
> Max
>
> On 11.10.18 05:14, Jesse Anderson wrote:
> > Interesting
> >
> > On Wed, Oct 10, 2018, 3:49 PM Kenneth Knowles  > <mailto:k...@apache.org>> wrote:
> >
> > Welcome, Hai!
> >
> > On Wed, Oct 10, 2018 at 3:46 PM Hai Lu  > <mailto:lhai...@gmail.com>> wrote:
> >
> > Hi, all
> >
> > This is Hai from LinkedIn. As Xinyu mentioned, I have been
> > working on portable API for Samza runner and made some solid
> > progress. It's been a very smooth process (although not
> > effortless for sure) and I'm really grateful for the great
> > platform that you all have built. I'm very impressed. Bravo!
> >
> > Excited to work with everyone on Beam. Do expect more questions
> > from me down the road.
> >
> > Thanks,
> > Hai
> >
> > On Wed, Oct 10, 2018 at 12:36 PM Kenneth Knowles
> > mailto:k...@apache.org>> wrote:
> >
> > Clarification: Thomas Groh wrote the fuser, not me!
> >
> > Thanks for the sharing all this. Really cool.
> >
> > Kenn
> >
> > On Wed, Oct 10, 2018 at 11:17 AM Rui Wang  > <mailto:ruw...@google.com>> wrote:
> >
> > Thanks for sharing! it's so exciting to hear that Beam
> > is being used on Samza in production @LinkedIn! Your
> > feedback will be helpful to Beam community!
> >
> > Besides, Beam supports SQL right now and hopefully Beam
> > community could also receive feedback on BeamSQL
> > <
> https://beam.apache.org/documentation/dsls/sql/overview/> in
> > the future.
> >
> >     -Rui
> >
> > On Wed, Oct 10, 2018 at 11:10 AM Jean-Baptiste Onofré
> > mailto:j...@nanthrax.net>> wrote:
> >
> > Thanks for sharing and congrats for this great work !
> >
> > Regards
> > JB
> > Le 10 oct. 2018, à 20:23, Xinyu Liu  > <mailto:%3Ca>@gmail.com <http://gmail.com>
> > target=_blank>xinyuliu.us
> > <http://xinyuliu.us>@gmail.com <http://gmail.com>> a
> > écrit:
> >
> > Hi, All,
> >
> > It's been over four months since we added the
> > Samza Runner to Beam, and we've been making a
> > lot of progress after that. Here I would like to
> > update your guys and share some really good news
> > happening here at LinkedIn:
> >
> > 1) First Beam job in production @LInkedIn!
> > After a few rounds of testing and benchmarking,
> > we finally rolled out our first Beam job here!
> > The job uses quite a few features, such as event
> > time, fixed/session windowing, early triggering,
> > and stateful processing. Our first customer is
> > very hap

Beam Samza Runner status update

2018-10-10 Thread Xinyu Liu
Hi, All,

It's been over four months since we added the Samza Runner to Beam, and
we've been making a lot of progress after that. Here I would like to update
your guys and share some really good news happening here at LinkedIn:

1) First Beam job in production @LInkedIn!
After a few rounds of testing and benchmarking, we finally rolled out our
first Beam job here! The job uses quite a few features, such as event time,
fixed/session windowing, early triggering, and stateful processing. Our
first customer is very happy and they highly appraise the easy-to-use Beam
API as well as powerful processing model. Due to the limited resources
here, we put our full trust in the work you guys are doing, and we didn't
run into any surprises. We see extremely attention to details as well as
non-compromise in any user experience everywhere in the code base. We would
like to thank everyone in the Beam community to contribute to such an
amazing framework!

2) A portable Samza Runner prototype
We are also starting the work in making Samza Runner portable. So far we
just got the python word count example working using portable Samza Runner.
Please look out for the PR for this very soon :). Again, this work is not
possible without the great Beam portability framework, and the developers
like Luke and Ahmet, just to name a few, behind it. The ReferenceRunner has
been extremely useful to us to figure out what's needed and how it works.
Kudos to Thomas Groh, Ben Sidhom and all the others who makes this
available to us. And to Kenn, your fuse work rocks.

3) More contributors in Samza Runner
The runner has been Chris and my personal project for a while and now it's
not the case. We got Hai Lu and Boris Shkolnik from Samza team to
contribute. Hai has been focusing on the portability work as mentioned in
#2, and Boris will work mostly on supporting our use cases. We will send
more emails discussing our use cases, like the "Update state after firing"
email I sent out earlier.

Finally, a shout-out to our very own Chris Pettitt. Without you, none of
the above won't happen!

Thanks,
Xinyu


Re: Update state after firing

2018-10-09 Thread Xinyu Liu
@Reuven: thanks for letting me know. I thought that's expected. We ran into
this issue when we try to use the Stateful ParDo to process events from
session-windowed inputs. As a walk-around, we ended up reassigning global
window to these events and use our backend RocksDb state TTL to retire old
data.

Thanks,
Xinyu

On Tue, Oct 9, 2018 at 11:54 AM Reuven Lax  wrote:

> 2) is simply a bug that nobody has ever gotten around to fixing. Stateful
> ParDo should support merging windows such as sessions.
>
> On Tue, Oct 9, 2018 at 11:40 AM Xinyu Liu  wrote:
>
>> We do use stateful ParDo in the same job for a different use case (and we
>> did read through Kenn's blogs :) ). Here are the reasons why we prefer
>> using aggregation:
>>
>> 1) It's much convenient for the user to define the window and trigger and
>> have the Combine on top of it. It's not very clear how early firing works
>> in Stateful Pardo, and it does seem to require more user effort to set up
>> the states/timers.
>>
>> 2) It seems Stateful ParDo doesn't support non-emergent windows, e.g.
>> session window. This is actually one of our use case.
>>
>> 3) It seems quite general and more flexible to our users to allow
>> updating state after firing. I don't want to tell our further users to stay
>> with from Combine for this and they have to handle the state explicitly.
>>
>> Thanks,
>> Xinyu
>>
>>
>>
>> On Tue, Oct 9, 2018 at 11:27 AM Rui Wang  wrote:
>>
>>> Hi Xinyu,
>>>
>>> There are two nice articles on Beam website about stateful processing
>>> that you may want to check out:
>>>
>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>>
>>> -Rui
>>>
>>> On Tue, Oct 9, 2018 at 11:07 AM Reuven Lax  wrote:
>>>
>>>> Have you considered using Beam's state API for this?
>>>>
>>>> On Tue, Oct 9, 2018 at 11:03 AM Xinyu Liu 
>>>> wrote:
>>>>
>>>>> Hi, guys,
>>>>>
>>>>> Current triggering allows us to either discard the state or accumulate
>>>>> the state after a window pane is fired. We use the extractOutput() in
>>>>> CombinFn to return the output value after the firing. All these have been
>>>>> working well for us. We do have a use case which seems not handled here: 
>>>>> we
>>>>> would like to update the state after the firing. Let me illustrate this 
>>>>> use
>>>>> case by an example: we have a 10-min fixed window with repeatedly early
>>>>> trigger of 1 min over an input stream which contains events of user id and
>>>>> page id. The accumulator for the window has two parts: 1) set of page ids
>>>>> already seen; 2) set of user ids who first views a page in this window
>>>>> (this is done by looking up #1). For each early firing, we want to output
>>>>> #2, and clear the second part of the state. But we would like to keep the
>>>>> #1 around for later calculations in this window. This example might be too
>>>>> simple to make sense, but it comes from one of our real use cases which is
>>>>> needed for some anti-abuse scenarios.
>>>>>
>>>>> To address this use case, is it OK to add a AccumT 
>>>>> updateAfterFiring(AccumT
>>>>> accumulator) in current CombinFn? That way the user can choose to
>>>>> update the state partially if needed, e.g. for our use case. Any feedback
>>>>> is very welcome.
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Re: Update state after firing

2018-10-09 Thread Xinyu Liu
We do use stateful ParDo in the same job for a different use case (and we
did read through Kenn's blogs :) ). Here are the reasons why we prefer
using aggregation:

1) It's much convenient for the user to define the window and trigger and
have the Combine on top of it. It's not very clear how early firing works
in Stateful Pardo, and it does seem to require more user effort to set up
the states/timers.

2) It seems Stateful ParDo doesn't support non-emergent windows, e.g.
session window. This is actually one of our use case.

3) It seems quite general and more flexible to our users to allow updating
state after firing. I don't want to tell our further users to stay with
from Combine for this and they have to handle the state explicitly.

Thanks,
Xinyu



On Tue, Oct 9, 2018 at 11:27 AM Rui Wang  wrote:

> Hi Xinyu,
>
> There are two nice articles on Beam website about stateful processing that
> you may want to check out:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> -Rui
>
> On Tue, Oct 9, 2018 at 11:07 AM Reuven Lax  wrote:
>
>> Have you considered using Beam's state API for this?
>>
>> On Tue, Oct 9, 2018 at 11:03 AM Xinyu Liu  wrote:
>>
>>> Hi, guys,
>>>
>>> Current triggering allows us to either discard the state or accumulate
>>> the state after a window pane is fired. We use the extractOutput() in
>>> CombinFn to return the output value after the firing. All these have been
>>> working well for us. We do have a use case which seems not handled here: we
>>> would like to update the state after the firing. Let me illustrate this use
>>> case by an example: we have a 10-min fixed window with repeatedly early
>>> trigger of 1 min over an input stream which contains events of user id and
>>> page id. The accumulator for the window has two parts: 1) set of page ids
>>> already seen; 2) set of user ids who first views a page in this window
>>> (this is done by looking up #1). For each early firing, we want to output
>>> #2, and clear the second part of the state. But we would like to keep the
>>> #1 around for later calculations in this window. This example might be too
>>> simple to make sense, but it comes from one of our real use cases which is
>>> needed for some anti-abuse scenarios.
>>>
>>> To address this use case, is it OK to add a AccumT updateAfterFiring(AccumT
>>> accumulator) in current CombinFn? That way the user can choose to
>>> update the state partially if needed, e.g. for our use case. Any feedback
>>> is very welcome.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>>
>>>
>>>


Update state after firing

2018-10-09 Thread Xinyu Liu
Hi, guys,

Current triggering allows us to either discard the state or accumulate the
state after a window pane is fired. We use the extractOutput() in CombinFn
to return the output value after the firing. All these have been working
well for us. We do have a use case which seems not handled here: we would
like to update the state after the firing. Let me illustrate this use case
by an example: we have a 10-min fixed window with repeatedly early trigger
of 1 min over an input stream which contains events of user id and page id.
The accumulator for the window has two parts: 1) set of page ids already
seen; 2) set of user ids who first views a page in this window (this is
done by looking up #1). For each early firing, we want to output #2, and
clear the second part of the state. But we would like to keep the #1 around
for later calculations in this window. This example might be too simple to
make sense, but it comes from one of our real use cases which is needed for
some anti-abuse scenarios.

To address this use case, is it OK to add a AccumT updateAfterFiring(AccumT
accumulator) in current CombinFn? That way the user can choose to update
the state partially if needed, e.g. for our use case. Any feedback is very
welcome.

Thanks,
Xinyu


Re: Donating the Dataflow Worker code to Apache Beam

2018-09-13 Thread Xinyu Liu
Big +1 (non-googler).

>From Samza Runner's perspective, we are very happy to see dataflow worker
code so we can learn and compete :).

Thanks,
Xinyu

On Thu, Sep 13, 2018 at 11:34 AM Suneel Marthi 
wrote:

> +1 (non-googler)
>
> This is a great  move
>
> Sent from my iPhone
>
> On Sep 13, 2018, at 2:25 PM, Tim Robertson 
> wrote:
>
> +1 (non googler)
> It sounds pragmatic, helps with transparency should issues arise and
> enables more people to fix.
>
>
> On Thu, Sep 13, 2018 at 8:15 PM Dan Halperin  wrote:
>
>> From my perspective as a (non-Google) community member, huge +1.
>>
>> I don't see anything bad for the community about open sourcing more of
>> the probably-most-used runner. While the DirectRunner is probably still the
>> most referential implementation of Beam, can't hurt to see more working
>> code. Other runners or runner implementors can refer to this code if they
>> want, and ignore it if they don't.
>>
>> In terms of having more code and tests to support, well, that's par for
>> the course. Will this change make the things that need to be done to
>> support them more obvious? (E.g., "this PR is blocked because someone at
>> Google on Dataflow team has to fix something" vs "this PR is blocked
>> because the Apache Beam code in foo/bar/baz is failing, and anyone who can
>> see the code can fix it"). The latter seems like a clear win for the
>> community.
>>
>> (As long as the code donation is handled properly, but that's completely
>> orthogonal and I have no reason to think it wouldn't be.)
>>
>> Thanks,
>> Dan
>>
>> On Thu, Sep 13, 2018 at 11:06 AM Lukasz Cwik  wrote:
>>
>>> Yes, I'm specifically asking the community for opinions as to whether it
>>> should be accepted or not.
>>>
>>> On Thu, Sep 13, 2018 at 10:51 AM Raghu Angadi 
>>> wrote:
>>>
 This is terrific!

 Is thread asking for opinions from the community about if it should be
 accepted? Assuming Google side decision is made to contribute, big +1 from
 me to include it next to other runners.

 On Thu, Sep 13, 2018 at 10:38 AM Lukasz Cwik  wrote:

> At Google we have been importing the Apache Beam code base and
> integrating it with the Google portion of the codebase that supports the
> Dataflow worker. This process is painful as we regularly are making
> breaking API changes to support libraries related to running portable
> pipelines (and sometimes in other places as well). This has made it
> sometimes difficult for PR changes to make changes without either breaking
> something for Google or waiting for a Googler to make the change 
> internally
> (e.g. dependency updates).
>
> This code is very similar to the other integrations that exist for
> runners such as Flink/Spark/Apex/Samza. It is an adaption layer that sits
> on top of an execution engine. There is no super secret awesome stuff as
> this code was already publicly visible in the past when it was part of the
> Google Cloud Dataflow github repo[1].
>
> Process wise the code will need to get approval from Google to be
> donated and for it to go through the code donation process but before we
> attempt to do that, I was wondering whether the community would object to
> adding this code to the master branch?
>
> The up side is that people can make breaking changes and fix it for
> all runners. It will also help Googlers contribute more to the portability
> story as it will remove the burden of doing the code import (wasted time)
> and it will allow people to develop in master (can have the whole project
> loaded in a single IDE).
>
> The downsides are that this will represent more code and unit tests to
> support.
>
> 1:
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/hotfix_v1.2/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker
>



Re: Status of IntelliJ with Gradle

2018-08-22 Thread Xinyu Liu
We experienced the same issues too in intellij after switching to latest
version. I did the trick Luke mentioned before to include the
beam-model-fn-execution and beam-model-job-management jars in the dependent
modules to get around compilation. But I cannot get the vendored protobuf
working. Seems the RunnerApi is using the original protobuf package, and it
causes confusion in intellij if I added the relocated jar. As a result, I
have to run and debug only using gradle for now.

Thanks,
Xinyu

On Wed, Aug 22, 2018 at 1:45 AM, Maximilian Michels  wrote:

> Thanks Lukasz. I also found that I can never fix all import errors by
> manually adding jars to the IntelliJ library list. It is also not a good
> solution because it breaks on reloading the Gradle project.
>
> New contributors might find the errors in IntelliJ distracting. Even
> worse, they might assume the problem is on their side. If we can't fix them
> soon, I'd suggest documenting the IntelliJ limitations in the contributor
> guide.
>
> On 20.08.18 17:58, Lukasz Cwik wrote:
>
>> Yes, I have the same issues with vendoring. These are the things that I
>> have tried without success to get Intellij to import the vendored modules
>> correctly:
>> * attempted to modify the idea.module.scopes to only include the vendored
>> artifacts (for some reason this is ignored and Intellij is relying on the
>> output of its own internal module, nothing I add to the scopes seems to
>> impact anything)
>> * modify the generated iml beforehand to add the vendored jar file as the
>> top dependency (jar never appears in the modules dependencies)
>>
>> On Mon, Aug 20, 2018 at 8:36 AM Maximilian Michels > > wrote:
>>
>> Thank you Etienne for opening the issue.
>>
>> Anyone else having problems with the shaded Protobuf dependency?
>>
>> On 20.08.18 16:14, Etienne Chauchot wrote:
>>  > Hi Max,
>>  >
>>  > I experienced the same, I had first opened a general ticket
>>  > (https://issues.apache.org/jira/browse/BEAM-4418) about gradle
>>  > improvements and I just split it in several tickets. Here is the
>> one
>>  > concerning the same issue:
>> https://issues.apache.org/jira/browse/BEAM-5176
>>  >
>>  > Etienne
>>  >
>>  > Le lundi 20 août 2018 à 15:51 +0200, Maximilian Michels a écrit :
>>  >> Hi Beamers,
>>  >>
>>  >> It's great to see the Beam build system overhauled. Thank you
>> for all
>>  >> the hard work.
>>  >>
>>  >> That said, I've just started contributing to Beam again and I feel
>>  >> really stupid for not having a fully-functional IDE. I've closely
>>  >> followed the IntelliJ/Gradle instructions [1]. In the terminal
>>  >> everything works fine.
>>  >>
>>  >> First of all, I get warnings like the following and the build
>> fails:
>>  >>
>>  >> 
>>  >>
>> .../beam/sdks/java/core/src/main/java/org/apache/beam/sdk/pa
>> ckage-info.java:29:
>>  >> warning: [deprecation] NonNull in
>> edu.umd.cs.findbugs.annotations has
>>  >> been deprecated
>>  >> @DefaultAnnotation(NonNull.class)
>>  >>^
>>  >> error: warnings found and -Werror specified
>>  >> 1 error
>>  >> 89 warnings
>>  >> =
>>  >>
>>  >> Somehow the "-Xlint:-deprecation" compiler flag does not get
>> through but
>>  >> "-Werror" does. I can get it to compile by removing the
>> "-Werror" flag
>>  >> from BeamModulePlugin but that's obviously not the solution.
>>  >>
>>  >> Further, once the build succeeds I still have to add the relocated
>>  >> Protobuf library manually because the one in "vendor" does not get
>>  >> picked up. I've tried clearing caches / re-adding the project /
>>  >> upgrading IntelliJ / changing Gradle configs.
>>  >>
>>  >>
>>  >> Is this just me or do you also have similar problems? If so, I
>> would
>>  >> like to compile a list of possible fixes to help other
>> contributors.
>>  >>
>>  >>
>>  >> Thanks,
>>  >> Max
>>  >>
>>  >>
>>  >> Tested with
>>  >> - IntelliJ 2018.1.6 and 2018.2.1.
>>  >> - MacOS
>>  >> - java version "1.8.0_112"
>>  >>
>>  >> [1] https://beam.apache.org/contribute/intellij/
>>  >>
>>  >>
>>
>>
> --
> Max
>


Re: Process JobBundleFactory for portable runner

2018-08-22 Thread Xinyu Liu
We are also interested in this Process JobBundleFactory as we are planning
to fork a process to run python sdk in Samza runner, instead of using
docker container. So this change will be helpful to us too. On the same
note, we are trying out portable_runner.py to submit a python job. Seems it
will create a default docker url even if the hardness_docker_image is set
to None in pipeline options. Shall we add another option or honor the None
in this option to support the process job? I made some local changes right
now to walk around this.

Thanks,
Xinyu

On Tue, Aug 21, 2018 at 12:25 PM, Henning Rohde  wrote:

> By "enum" in quotes, I meant the usual open URN style pattern not an
> actual enum. Sorry if that wasn't clear.
>
> On Tue, Aug 21, 2018 at 11:51 AM Lukasz Cwik  wrote:
>
>> I would model the environment to be more free form then enums such that
>> we have forward looking extensibility and would suggest to follow the same
>> pattern we use on PTransforms (using an URN and a URN specific payload).
>> Note that in this case we may want to support a list of supported
>> environments (e.g. java, docker, python, ...).
>>
>> On Tue, Aug 21, 2018 at 10:37 AM Henning Rohde 
>> wrote:
>>
>>> One thing to consider that we've talked about in the past. It might make
>>> sense to extend the environment proto and have the SDK be explicit about
>>> which kinds of environment it supports:
>>>
>>> https://github.com/apache/beam/blob/
>>> 8c4f4babc0b0d55e7bddefa3f9f9ba65d21ef139/model/pipeline/src/
>>> main/proto/beam_runner_api.proto#L969
>>>
>>> This choice might impact what files are staged or what not. Some SDKs,
>>> such as Go, provide a compiled binary and _need_ to know what the target
>>> architecture is. Running on a mac with and without docker, say, requires a
>>> different worker in each case. If we add an "enum", we can also easily add
>>> the external idea where the SDK/user starts the SDK harnesses instead of
>>> the runner. Each runner may not support all types of environments.
>>>
>>> Henning
>>>
>>> On Tue, Aug 21, 2018 at 2:52 AM Maximilian Michels 
>>> wrote:
>>>
 For reference, here is corresponding JIRA issue for this thread:
 https://issues.apache.org/jira/browse/BEAM-5187

 On 16.08.18 11:15, Maximilian Michels wrote:
 > Makes sense to have an option to run the SDK harness in a
 non-dockerized
 > environment.
 >
 > I'm in the process of creating a Docker entry point for Flink's
 > JobServer[1]. I suppose you would also prefer to execute that one
 > standalone. We should make sure this is also an option.
 >
 > [1] https://issues.apache.org/jira/browse/BEAM-4130
 >
 > On 16.08.18 07:42, Thomas Weise wrote:
 >> Yes, that's the proposal. Everything that would otherwise be packaged
 >> into the Docker container would need to be pre-installed in the host
 >> environment. In the case of Python SDK, that could simply mean a
 >> (frozen) virtual environment that was setup when the host was
 >> provisioned - the SDK harness process(es) will then just utilize
 that.
 >> Of course this flavor of SDK harness execution could also be useful
 in
 >> the local development environment, where right now someone who
 already
 >> has the Python environment needs to also install Docker and update a
 >> container to launch a Python SDK pipeline on the Flink runner.
 >>
 >> On Wed, Aug 15, 2018 at 12:40 PM Daniel Oliveira <
 danolive...@google.com
 >> > wrote:
 >>
 >>  I just want to clarify that I understand this correctly since
 I'm
 >>  not that familiar with the details behind all these execution
 >>  environments yet. Is the proposal to create a new
 JobBundleFactory
 >>  that instead of using Docker to create the environment that the
 new
 >>  processes will execute in, this JobBundleFactory would execute
 the
 >>  new processes directly in the host environment? So in practice
 if I
 >>  ran a pipeline with this JobBundleFactory the SDK Harness and
 Runner
 >>  Harness would both be executing directly on my machine and would
 >>  depend on me having the dependencies already present on my
 machine?
 >>
 >>  On Mon, Aug 13, 2018 at 5:50 PM Ankur Goenka >>> >>  > wrote:
 >>
 >>  Thanks for starting the discussion. I will be happy to help.
 >>  I agree, we should have pluggable SDKHarness environment
 Factory.
 >>  We can register multiple Environment factory using service
 >>  registry and use the PipelineOption to pick the right one
 on per
 >>  job basis.
 >>
 >>  There are a couple of things which are require to setup
 before
 >>  launching the process.
 >>
 >>* Setting up the environment 

Re: Samza runner committer support

2018-06-30 Thread Xinyu Liu
Thanks you guys for volunteering! Looks we got plenty coverage on
portability, Python and Go. I will reach out to you guys for reviews and
questions.

Thanks,
Xinyu

On Fri, Jun 29, 2018 at 2:51 PM, Lukasz Cwik  wrote:

> I would also have context on any portability related changes and would be
> glad to give guidance in this regard. I'm also a good resource for asking
> runner implementation questions as well.
>
> On Fri, Jun 29, 2018 at 11:01 AM Henning Rohde  wrote:
>
>> I would be happy to help with some of the portability-related changes, as
>> well as trying out Go-on-Samza. I'm out on vacation most of July, though.
>>
>> On Thu, Jun 28, 2018 at 1:12 PM Ahmet Altay  wrote:
>>
>>> I would be happy to help with this. I can prioritize python related
>>> changes. For portability related changes I can try to help but I may not be
>>> the best person.
>>>
>>> On Thu, Jun 28, 2018 at 12:33 PM, Xinyu Liu 
>>> wrote:
>>>
>>>> Hi, All,
>>>>
>>>> Our Samza runner has recently been merged to master, and Kenn has been
>>>> extremely instrumental during the whole process, e.g. design decisions,
>>>> feature requests and code reviews. We would like thank him for all the
>>>> support he has been given to us!
>>>>
>>>> Given Kenn is going to be on leave soon, we want to call out for
>>>> committer sponsors who will work with us in the mean time. We expect a lot
>>>> more upcoming updates for the Samza runner, such as the portable runner and
>>>> python support. We will also have some feature asks, e.g. the async API
>>>> briefly discussed in the previous email. Would anyone be interested in
>>>> being a sponsor part time for the Samza runner?
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>
>>>


Samza runner committer support

2018-06-28 Thread Xinyu Liu
Hi, All,

Our Samza runner has recently been merged to master, and Kenn has been
extremely instrumental during the whole process, e.g. design decisions,
feature requests and code reviews. We would like thank him for all the
support he has been given to us!

Given Kenn is going to be on leave soon, we want to call out for committer
sponsors who will work with us in the mean time. We expect a lot more
upcoming updates for the Samza runner, such as the portable runner and
python support. We will also have some feature asks, e.g. the async API
briefly discussed in the previous email. Would anyone be interested in
being a sponsor part time for the Samza runner?

Thanks,
Xinyu


Re: Going on leave for a bit

2018-06-26 Thread Xinyu Liu
Congrats! Enjoy the time without sleep.

Thanks,
Xinyu

On Tue, Jun 26, 2018 at 10:12 AM, Griselda Cuevas  wrote:

> Enjoy the time off Kenn!
>
>
> On Tue, 26 Jun 2018 at 12:14, Kai Jiang  wrote:
>
>> Congrats! Enjoy your family time.
>>
>> Best,
>> Kai
>>
>> On Tue, Jun 26, 2018, 09:11 Alan Myrvold  wrote:
>>
>>> Congrats, Kenn and have a great break.
>>>
>>> On Tue, Jun 26, 2018 at 9:10 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
 Best wishes to you and your family, Kenneth!

 Alexey

 On 26 Jun 2018, at 15:45, Rafael Fernandez  wrote:

 Have a great time, Kenn!

 On Tue, Jun 26, 2018 at 5:49 AM Ismaël Mejía  wrote:

> Enjoy your family time.
>
> Best wishes,
> Ismael
>
>
> On Tue, Jun 26, 2018 at 12:13 PM Pei HE  wrote:
>
>> (A late) Congrats for the newborn!
>> --
>> Pei
>>
>> On Tue, Jun 26, 2018 at 1:42 PM, Kenneth Knowles 
>> wrote:
>> > Hi friends,
>> >
>> > I think I did not mention on dev@ at the time, but my child #2
>> arrived March
>> > 14 (Pi day!) and I took some weeks off. Starting ~July 4 I will be
>> taking a
>> > more significant absence, until ~October 1, trying my best to be
>> totally
>> > offline.
>> >
>> > JFYI so that you know why JIRAs and PRs are not being addressed. I
>> am also
>> > unassigning my JIRAs so that I am not holding any mutexes, and I
>> will close
>> > PRs so they don't get stale.
>> >
>> > Any questions or pressing issues, I will be online this week and a
>> little
>> > bit next week.
>> >
>> > Kenn
>>
>



Re: [PROPOSAL] Merge samza-runner to master

2018-06-25 Thread Xinyu Liu
Thanks a lot, Kenn! Finally we linked our runner in. I will work on the
rest of the stuff as you mentioned. Thanks again for everyone's comments,
too.

Thanks,
Xinyu

On Mon, Jun 25, 2018 at 2:48 PM, Kenneth Knowles  wrote:

> This is done. Now we need to make sure the build is running, healthy, in
> the PR template.
>
> On Mon, Jun 25, 2018 at 9:10 AM Kenneth Knowles  wrote:
>
>> I'll do it. I'm working with Xinyu on the PR.
>>
>> Kenn
>>
>> On Mon, Jun 25, 2018, 08:24 Ismaël Mejía  wrote:
>>
>>> +1
>>>
>>> It is important to have new runners merged (even if not 100% complete)
>>> so they benefit of the fixes going on, and that they can easily (and
>>> incrementally) start to track the new portability features as they develop.
>>>
>>> What is next then ? Who triggers the green button so this happens?
>>>
>>>
>>>
>>>
>>> On Sat, Jun 23, 2018 at 6:43 AM Jean-Baptiste Onofré 
>>> wrote:
>>>
>>>> +1
>>>>
>>>> As the build is fine, it makes sense to merge pretty fast.
>>>>
>>>> Thanks,
>>>> Regards
>>>> JB
>>>>
>>>> On 22/06/2018 00:14, Xinyu Liu wrote:
>>>> > I updated the merge PR with the gradle integration (there was some
>>>> > Jenkins Java tests failure with google cloud quota issues. It seems
>>>> not
>>>> > related to this patch). Please feel free to ping me if anything else
>>>> is
>>>> > needed.
>>>> >
>>>> > Thanks,
>>>> > Xinyu
>>>> >
>>>> > On Mon, Jun 18, 2018 at 5:44 PM, Xinyu Liu >>> > <mailto:xinyuliu...@gmail.com>> wrote:
>>>> >
>>>> > @Kenn: I am going to add the build.gradle. Is there anything else?
>>>> >
>>>> > @Ahmet, @Robert: here are more details about the samza runner
>>>> right now:
>>>> >
>>>> > - Missing pieces: timer support in ParDo is not there yet and I
>>>> plan
>>>> > to add it soon. SplittableParDo is missing but we don't have a use
>>>> > case so far. We are on par with the other runners for the rest of
>>>> > the Java features.
>>>> > - Work in Progress: implement the portable pipeline runner logic.
>>>> > - Future plans: support Python is our next goal. Hopefully we will
>>>> > get a prototype working sometime next quarter :).
>>>> >
>>>> > Btw, thanks everyone for the comments!
>>>> >
>>>> > Thanks,
>>>> > Xinyu
>>>> >
>>>> > On Mon, Jun 18, 2018 at 4:59 PM, Robert Burke >>> > <mailto:rob...@frantil.com>> wrote:
>>>> >
>>>> > This is exciting! Is it implemented as a portability framework
>>>> > runner too?
>>>> >
>>>> >
>>>> > On Mon, Jun 18, 2018, 4:36 PM Pablo Estrada <
>>>> pabl...@google.com
>>>> > <mailto:pabl...@google.com>> wrote:
>>>> >
>>>> > It's very exciting to see a new runner making it into
>>>> > master. : )
>>>> >
>>>> > Best
>>>> > -P.
>>>> >
>>>> > On Mon, Jun 18, 2018 at 3:38 PM Rafael Fernandez
>>>> > mailto:rfern...@google.com>> wrote:
>>>> >
>>>> > I've just read this and wanted to share my excitement
>>>> :D
>>>> >
>>>> >
>>>> >
>>>> > On Mon, Jun 18, 2018 at 3:10 PM Kenneth Knowles
>>>> > mailto:k...@google.com>> wrote:
>>>> >
>>>> > One thing that will be necessary is porting the
>>>> > build to Gradle.
>>>> >
>>>> > Kenn
>>>> >
>>>> > On Mon, Jun 18, 2018 at 11:57 AM Xinyu Liu
>>>> > >>> > <mailto:xinyuliu...@gmail.com>> wrote:
>>>> >
>>>> > Hi, Folks,
>>>> >
>>>> > On behalf of the Samza team, I would lik

Re: [PROPOSAL] Merge samza-runner to master

2018-06-22 Thread Xinyu Liu
A little clarification on the contributors: Chris Pettitt and I are the
main contributors so far. Chris wrote the initial prototype but his commits
got squashed into the giant initial commit, and he's been reviewing all
incremental changes afterwards. Two more team members (Boris Shkolnik and
Hai Lu) are starting to work on it. In the next quarter, our focus is
portability, particularly Python. I will keep you guys updated with our
status and plan, and maybe more questions and ideas down the road :).

Thanks,
Xinyu

On Fri, Jun 22, 2018 at 7:23 AM, Rafael Fernandez 
wrote:

> ​I think it's great to go ahead and merge it, so it can continue evolving.
> As with all things, it'll adopt new stuff as it becomes ready (in fact, it
> may even prove to be a great example of how to port an existing "legacy"
> runner to the portability stuff when ready).​
>
> It seems the immediate blocker (gradle) was addressed, and there is great
> future work planned. Exciting!
>
> On Thu, Jun 21, 2018 at 8:00 PM Kenneth Knowles  wrote:
>
>> *Contributors*
>> Agree with Robert's concern. But this is a nice opportunity for Beam to
>> connect. It is a different sort of backend and a different sort of
>> community that we are linking in.
>>
>> Consider the Gearpump and Apex runners: both had resumes that met the
>> requirements, but might not today. But they haven't been a burden. I have
>> some hope the Samza runner might have a better chance recruiting users and
>> contributors, since the value add for Samza users is unique among Beam
>> runners, and likewise the Samza community is unique among backend
>> communities.
>>
>> *Portability*
>> My take is that we shouldn't _start_ any runner down the legacy path. But
>> this is runner predates portability. I don't think the Java SDK is ready to
>> provide feature parity, much less adequate performance, so it doesn't seem
>> reasonable to require using it. Community > code as well.
>>
>> Kenn
>>
>> On Thu, Jun 21, 2018 at 3:34 PM Robert Bradshaw 
>> wrote:
>>
>>> Neat to see a new runner on board!
>>>
>>> I would like to make it a requirement for all new runners to support
>>> the portability API, but given that it's still somewhat of a moving
>>> target, and you have ongoing work in this direction, that may not be a
>>> hard requirement.
>>>
>>> I'm a bit concerned that there is are only two contributors (but the
>>> git logs): you and Kenn. But you do indicate there are others
>>> interested in working on this.
>>>
>>> Other than that, this looks great.
>>>
>>> - Robert
>>>
>>>
>>> On Thu, Jun 21, 2018 at 3:14 PM Xinyu Liu  wrote:
>>> >
>>> > I updated the merge PR with the gradle integration (there was some
>>> Jenkins Java tests failure with google cloud quota issues. It seems not
>>> related to this patch). Please feel free to ping me if anything else is
>>> needed.
>>> >
>>> > Thanks,
>>> > Xinyu
>>> >
>>> > On Mon, Jun 18, 2018 at 5:44 PM, Xinyu Liu 
>>> wrote:
>>> >>
>>> >> @Kenn: I am going to add the build.gradle. Is there anything else?
>>> >>
>>> >> @Ahmet, @Robert: here are more details about the samza runner right
>>> now:
>>> >>
>>> >> - Missing pieces: timer support in ParDo is not there yet and I plan
>>> to add it soon. SplittableParDo is missing but we don't have a use case so
>>> far. We are on par with the other runners for the rest of the Java features.
>>> >> - Work in Progress: implement the portable pipeline runner logic.
>>> >> - Future plans: support Python is our next goal. Hopefully we will
>>> get a prototype working sometime next quarter :).
>>> >>
>>> >> Btw, thanks everyone for the comments!
>>> >>
>>> >> Thanks,
>>> >> Xinyu
>>> >>
>>> >> On Mon, Jun 18, 2018 at 4:59 PM, Robert Burke 
>>> wrote:
>>> >>>
>>> >>> This is exciting! Is it implemented as a portability framework
>>> runner too?
>>> >>>
>>> >>>
>>> >>> On Mon, Jun 18, 2018, 4:36 PM Pablo Estrada 
>>> wrote:
>>> >>>>
>>> >>>> It's very exciting to see a new runner making it into master. : )
>>> >>>>
>>> >>>> Best
>>> >>>> -P.
>>> >>>>
>>&g

Re: [PROPOSAL] Merge samza-runner to master

2018-06-21 Thread Xinyu Liu
I updated the merge PR with the gradle integration (there was some Jenkins
Java tests failure with google cloud quota issues. It seems not related to
this patch). Please feel free to ping me if anything else is needed.

Thanks,
Xinyu

On Mon, Jun 18, 2018 at 5:44 PM, Xinyu Liu  wrote:

> @Kenn: I am going to add the build.gradle. Is there anything else?
>
> @Ahmet, @Robert: here are more details about the samza runner right now:
>
> - Missing pieces: timer support in ParDo is not there yet and I plan to
> add it soon. SplittableParDo is missing but we don't have a use case so
> far. We are on par with the other runners for the rest of the Java features.
> - Work in Progress: implement the portable pipeline runner logic.
> - Future plans: support Python is our next goal. Hopefully we will get a
> prototype working sometime next quarter :).
>
> Btw, thanks everyone for the comments!
>
> Thanks,
> Xinyu
>
> On Mon, Jun 18, 2018 at 4:59 PM, Robert Burke  wrote:
>
>> This is exciting! Is it implemented as a portability framework runner too?
>>
>>
>> On Mon, Jun 18, 2018, 4:36 PM Pablo Estrada  wrote:
>>
>>> It's very exciting to see a new runner making it into master. : )
>>>
>>> Best
>>> -P.
>>>
>>> On Mon, Jun 18, 2018 at 3:38 PM Rafael Fernandez 
>>> wrote:
>>>
>>>> I've just read this and wanted to share my excitement :D
>>>>
>>>>
>>>>
>>>> On Mon, Jun 18, 2018 at 3:10 PM Kenneth Knowles  wrote:
>>>>
>>>>> One thing that will be necessary is porting the build to Gradle.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Jun 18, 2018 at 11:57 AM Xinyu Liu 
>>>>> wrote:
>>>>>
>>>>>> Hi, Folks,
>>>>>>
>>>>>> On behalf of the Samza team, I would like to propose to merge the
>>>>>> samza-runner branch into master. The branch was created on Jan when we
>>>>>> first introduced the Samza Runner [1], and we've been adding features and
>>>>>> refining it afterwards. Now the runner satisfies the criteria outlined in
>>>>>> [2], and merging it to master will give more visibility to other
>>>>>> contributors and users.
>>>>>>
>>>>>> 1. Have at least 2 contributors interested in maintaining it, and 1
>>>>>> committer interested in supporting it: *Both Chris and me have been 
>>>>>> making
>>>>>> contributions and I am going to sign up for the support. There are more
>>>>>> folks in the Samza team interested in contributing to it. Thanks Kenn for
>>>>>> all the help and reviews for the runner!*
>>>>>> 2. Provide both end-user and developer-facing documentation: *The PR
>>>>>> for the samza-runner doc has runner user guide, capability matrix, and
>>>>>> tutorial using WordCount examples.*
>>>>>> 3. Have at least a basic level of unit test coverage: *Unit tests are
>>>>>> here [3].*
>>>>>> 4. Run all existing applicable integration tests with other Beam 
>>>>>> components
>>>>>> and create additional tests as appropriate: Enabled ValidatesRunner 
>>>>>> tests.*
>>>>>> 5. Be able to handle a subset of the model that addresses a
>>>>>> significant set of use cases, such as ‘traditional batch’ or ‘processing
>>>>>> time streaming’: *We have test Beam jobs running in Yarn using event-time
>>>>>> processing of Kafka streams.*
>>>>>> 6. Update the capability matrix with the current status. *Same as #2.*
>>>>>> 7. Add a webpage under documentation/runners. *Same as #2.*
>>>>>>
>>>>>> The PR for the samza-runner merge: https://github.com/apac
>>>>>> he/beam/pull/5668
>>>>>> The PR for the samza-runner doc: https://github.com/apache
>>>>>> /beam-site/pull/471
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-3079
>>>>>> [2] https://beam.apache.org/contribute/
>>>>>> [3] https://github.com/apache/beam/tree/samza-runner/runners
>>>>>> /samza/src/test
>>>>>>
>>>>> --
>>> Got feedback? go/pabloem-feedback
>>>
>>
>


Re: [PROPOSAL] Merge samza-runner to master

2018-06-18 Thread Xinyu Liu
@Kenn: I am going to add the build.gradle. Is there anything else?

@Ahmet, @Robert: here are more details about the samza runner right now:

- Missing pieces: timer support in ParDo is not there yet and I plan to add
it soon. SplittableParDo is missing but we don't have a use case so far. We
are on par with the other runners for the rest of the Java features.
- Work in Progress: implement the portable pipeline runner logic.
- Future plans: support Python is our next goal. Hopefully we will get a
prototype working sometime next quarter :).

Btw, thanks everyone for the comments!

Thanks,
Xinyu

On Mon, Jun 18, 2018 at 4:59 PM, Robert Burke  wrote:

> This is exciting! Is it implemented as a portability framework runner too?
>
>
> On Mon, Jun 18, 2018, 4:36 PM Pablo Estrada  wrote:
>
>> It's very exciting to see a new runner making it into master. : )
>>
>> Best
>> -P.
>>
>> On Mon, Jun 18, 2018 at 3:38 PM Rafael Fernandez 
>> wrote:
>>
>>> I've just read this and wanted to share my excitement :D
>>>
>>>
>>>
>>> On Mon, Jun 18, 2018 at 3:10 PM Kenneth Knowles  wrote:
>>>
>>>> One thing that will be necessary is porting the build to Gradle.
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Jun 18, 2018 at 11:57 AM Xinyu Liu 
>>>> wrote:
>>>>
>>>>> Hi, Folks,
>>>>>
>>>>> On behalf of the Samza team, I would like to propose to merge the
>>>>> samza-runner branch into master. The branch was created on Jan when we
>>>>> first introduced the Samza Runner [1], and we've been adding features and
>>>>> refining it afterwards. Now the runner satisfies the criteria outlined in
>>>>> [2], and merging it to master will give more visibility to other
>>>>> contributors and users.
>>>>>
>>>>> 1. Have at least 2 contributors interested in maintaining it, and 1
>>>>> committer interested in supporting it: *Both Chris and me have been making
>>>>> contributions and I am going to sign up for the support. There are more
>>>>> folks in the Samza team interested in contributing to it. Thanks Kenn for
>>>>> all the help and reviews for the runner!*
>>>>> 2. Provide both end-user and developer-facing documentation: *The PR
>>>>> for the samza-runner doc has runner user guide, capability matrix, and
>>>>> tutorial using WordCount examples.*
>>>>> 3. Have at least a basic level of unit test coverage: *Unit tests are
>>>>> here [3].*
>>>>> 4. Run all existing applicable integration tests with other Beam 
>>>>> components
>>>>> and create additional tests as appropriate: Enabled ValidatesRunner 
>>>>> tests.*
>>>>> 5. Be able to handle a subset of the model that addresses a
>>>>> significant set of use cases, such as ‘traditional batch’ or ‘processing
>>>>> time streaming’: *We have test Beam jobs running in Yarn using event-time
>>>>> processing of Kafka streams.*
>>>>> 6. Update the capability matrix with the current status. *Same as #2.*
>>>>> 7. Add a webpage under documentation/runners. *Same as #2.*
>>>>>
>>>>> The PR for the samza-runner merge: https://github.com/
>>>>> apache/beam/pull/5668
>>>>> The PR for the samza-runner doc: https://github.com/
>>>>> apache/beam-site/pull/471
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/BEAM-3079
>>>>> [2] https://beam.apache.org/contribute/
>>>>> [3] https://github.com/apache/beam/tree/samza-runner/
>>>>> runners/samza/src/test
>>>>>
>>>> --
>> Got feedback? go/pabloem-feedback
>>
>


[PROPOSAL] Merge samza-runner to master

2018-06-18 Thread Xinyu Liu
Hi, Folks,

On behalf of the Samza team, I would like to propose to merge the
samza-runner branch into master. The branch was created on Jan when we
first introduced the Samza Runner [1], and we've been adding features and
refining it afterwards. Now the runner satisfies the criteria outlined in
[2], and merging it to master will give more visibility to other
contributors and users.

1. Have at least 2 contributors interested in maintaining it, and 1
committer interested in supporting it: *Both Chris and me have been making
contributions and I am going to sign up for the support. There are more
folks in the Samza team interested in contributing to it. Thanks Kenn for
all the help and reviews for the runner!*
2. Provide both end-user and developer-facing documentation: *The PR for
the samza-runner doc has runner user guide, capability matrix, and tutorial
using WordCount examples.*
3. Have at least a basic level of unit test coverage: *Unit tests are here
[3].*
4. Run all existing applicable integration tests with other Beam components
and create additional tests as appropriate: Enabled ValidatesRunner tests.*
5. Be able to handle a subset of the model that addresses a significant set
of use cases, such as ‘traditional batch’ or ‘processing time streaming’:
*We have test Beam jobs running in Yarn using event-time processing of
Kafka streams.*
6. Update the capability matrix with the current status. *Same as #2.*
7. Add a webpage under documentation/runners. *Same as #2.*

The PR for the samza-runner merge: https://github.com/apache/beam/pull/5668
The PR for the samza-runner doc: https://github.com/
apache/beam-site/pull/471

Thanks,
Xinyu

[1] https://issues.apache.org/jira/browse/BEAM-3079
[2] https://beam.apache.org/contribute/
[3] https://github.com/apache/beam/tree/samza-runner/runners/samza/src/test


Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Xinyu Liu
I am happy to chat about it over hangout or slack too. Let's talk offline
to set it up if needed.

Thanks,
Xinyu

On Tue, May 15, 2018 at 10:51 AM, Xinyu Liu <xinyuliu...@gmail.com> wrote:

> For Samza runner, it's always processes key+window pairs serially. To
> answer Luke's question:
>
> - Why Samza needs to use a snapshot in the first place and should be able
> to read data from RocksDb directly?
> I believe that the point of adding the readIterator() is to allow us to
> read from RocksDb directly without bulk loading to memory. In this case
> Samza will create the RocksDb iterator and expose it through the interface
> to the user. There is no need to create an explicit snapshot for this. For
> read() we need to load content into memory so we can release the rocksDb
> resources immediately and the user can iterate many times later. The reason
> I mentioned about snapshot is that if we don't load into memory for read(),
> we need to maintain rocskDb snapshot under the hood so the user can iterate
> the same content. This will cause memory and performance issues for our
> users.
>
> For the commit question, do you mean whether we commit in the middle of
> processing a bundle? That's not the case in Samza. Samza will wait for the
> bundle process complete, flush the output and state modifications and then
> checkpointing.
>
> Thanks,
> Xinyu
>
>
>
> On Tue, May 15, 2018 at 10:34 AM, Kenneth Knowles <k...@google.com> wrote:
>
>>
>>
>> On Tue, May 15, 2018 at 9:36 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> If it always processes key+window pairs serially, then I'm not sure why
>>> Samza needs to use a snapshot in the first place and should be able to read
>>> data from RocksDb directly.
>>>
>>> The other point of confusion is around how is a readIterator() any
>>> different then returning a wrapper that just invokes read().iterator() on a
>>> BagState. If that is the case then this is a trivial change which doesn't
>>> need to change the portability design.
>>>
>>
>> I think this is a key point. Just calling out the obvious to talk about
>> it:
>>
>>  - read() returns an iterable - a value with fixed contents that can be
>> iterated many times
>>  - readIterator() would return an iterator - a value with fixed contents
>> that can be iterated only once
>>
>> In both cases, later writes should not affect the observed contents. So
>> they both need some kind of snapshot for their lifetime. With
>> read().iterator() the intermediate iterable is immediately garbage, but
>> this is the reference queue based solution again, with some ref tracking
>> for iterables/iterators.
>>
>> I now see what you mean about committing within a bundle; we definitely
>> need to atomically commit the completion of upstream elements, output
>> elements, and state modifications. What are your thoughts on this Xinyu?
>>
>> Happy to join a quick hangout.
>>
>> Kenn
>>
>>
>> The nuance is likely that the interface between what a Runner implements
>>> to provide support for user state may give itself more visibility into what
>>> is going on then what the portability framework can provide as is.
>>>
>>> On Tue, May 15, 2018 at 8:57 AM Kenneth Knowles <k...@google.com> wrote:
>>>
>>>> OK, got it. But what consistency are you referring to? I was trying to
>>>> point out that there's nothing but straight-line program order consistency.
>>>> There's only one actor doing all the reads and all the writes.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, May 15, 2018 at 8:39 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I misspoke when I said portability semantics and should have said
>>>>> portability design/implementation. This is why I had a follow-up e-mail 
>>>>> and
>>>>> clarified that I'm confused on:
>>>>> * I don't understand how you would want close to change the semantics
>>>>> of a user state specification and how it affects the lifetime of user 
>>>>> state?
>>>>> ** Does it represent committing information within a bundle?
>>>>> ** Does it mean that user state can ignore the replayable and
>>>>> consistent semantics for a lifetime of a bundle semantics?
>>>>>
>>>>> I'm trying to tie back what does a `ReadableState
>>>>> readIterator()` means for Runner authors and how it solves the
>>>>> memory/close() problem for Samza. Ba

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-15 Thread Xinyu Liu
>> thread. Xinyu / Kenn, how about we setup a time using Slack?
>>>>
>>>> On Mon, May 14, 2018 at 8:36 PM Kenneth Knowles <k...@google.com> wrote:
>>>>
>>>>> I feel like this discussion is kind of far from the primary intention.
>>>>> The point of ParDo(stateful DoFn) is to enable naive single-threaded code
>>>>> in a style intuitive to a beginning imperative programmer. So:
>>>>>
>>>>>  - the return value of read() should act like an immutable value
>>>>>  - if there is a read after a write, that read should reflect the
>>>>> changes written, because there's a trivial happens-before relationship
>>>>> induced by program order
>>>>>  - there are no non-trivial happens-before relationships
>>>>>  - a write after a read should not affect the value read before
>>>>>
>>>>> From that starting point, the limitations on expressiveness (single
>>>>> threaded per-key-and-window-and-step) give rise to embarrassingly parallel
>>>>> computation and GC.
>>>>>
>>>>> So when you say "portability semantics" it makes me concerned. The
>>>>> word "semantics" refers to the mapping between what a user writes and what
>>>>> it means. Primarily, that means the conceptual translation between a
>>>>> primitive PTransform and the corresponding PCollection-to-PCollection
>>>>> operation. It is true that portability complicates things because a user's
>>>>> code can only be given meaning relative to an SDK harness. But the 
>>>>> end-user
>>>>> intention is unchanged. If we had time and resources to give the Fn API a
>>>>> solid spec, it should be such that the SDK harness has little choice but 
>>>>> to
>>>>> implement the primitives as intended.
>>>>>
>>>>> In other words, it is the job of https://s.apache.org/beam-fn-
>>>>> state-api-and-bundle-processing* to implement
>>>>> https://s.apache.org/beam-state. The discussion of adding
>>>>> `ReadableState readIterator()` to BagState seems consistent with
>>>>> the latter.
>>>>>
>>>>> Kenn
>>>>>
>>>>> *IIUC you've chosen to use the same underlying proto service for side
>>>>> inputs and by-reference values. That's an implementation detail that I 
>>>>> have
>>>>> no particular opinion about except that if it complicates implementing the
>>>>> primary motivator for state, it may not be a great fit.
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 14, 2018 at 7:35 PM Kenneth Knowles <k...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I don't follow why allowing freeing resources would be counter to the
>>>>>> spec. I don't really know what you mean by consistent for a bundle. 
>>>>>> State,
>>>>>> in the sense of the user-facing per-key-and-window state API, is single
>>>>>> threaded and scoped to a single DoFn. There's no one else who can write 
>>>>>> the
>>>>>> state. If a BagState is read and written and read again, the user-facing
>>>>>> logic should be unaware of the resources and not have any logic to deal
>>>>>> with consistency.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Mon, May 14, 2018 at 6:09 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Hmm, some of the problem I'm dealing with is:
>>>>>>> * I don't understand how you would want close to change the
>>>>>>> semantics of a user state specification and how it affects the lifetime 
>>>>>>> of
>>>>>>> user state?
>>>>>>> ** Does it represent committing information within a bundle?
>>>>>>> ** Does it mean that user state can ignore the replayable and
>>>>>>> consistent semantics for a lifetime of a bundle semantics?
>>>>>>> * I do understand that close semantics may make it easier for Samza
>>>>>>> runner to support state that is greater then memory, but what does it
>>>>>>> provide to users and how would they benefit (like what new scenarios 
>>>>>>> would
>>>>>>> it support)?
>

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Xinyu Liu
I will take a look at the docs to understand the problem better. A minor
comment to 2) is that I don't intend to change the existing iterable API. I
plan to implement it similar to Flink, loading the data into memory and
closing the underlying snapshot after that. So the changes should be
backward compatible.

Thanks,
Xinyu



On Mon, May 14, 2018 at 4:54 PM, Lukasz Cwik <lc...@google.com> wrote:

> I believe adding support for a state spec to be 'closed' or 'freed' is
> counter to the requirement of a state spec being consistent for the
> lifetime of a bundle, are we willing to change this requirement for the
> lifetime of a bundle or say that runners can arbitrary say that a StateSpec
> can't be accessed anymore?
>
> If not, I'm having trouble of thinking of a good way on how to integrate
> the 'close/free' API with portability semantics because:
> 1) Runners do control bundle size but other then the trivial cases where a
> runner specifically chooses:
>   a) to only process a single element at a time (negating the need for a
> free/close() method).
>   b) optimistically process elements and if you run out of memory kill the
> bundle and try again with a smaller bundle.
> 2) Adding an API to provide Iterators doesn't mean that users can't still
> use iterable which can't be force closed. (Dropping Iterable in exchange
> for Iterator in the API is backwards incompatible so likely to be deferred
> till the next major version of Apache Beam).
> 3) SDKs are able to process as many elements as they want in parallel,
> nothing requires them to execute elements serially throughout the pipeline
> graph.
> 4) The runner has limited information into what an SDK is doing. The SDK
> provides a lower bound on how many elements it has processed but SDKs
> aren't required to implement this meaning that they could technically be
> processing everything in random order after they have seen all input for a
> bundle.
>
> We'll need to work through the scenarios, Xinyu it would be useful for you
> to take a look at these two docs for context into the problem space:
> https://s.apache.org/beam-fn-api-processing-a-bundle (How to process a
> bundle)
> https://s.apache.org/beam-fn-state-api-and-bundle-processing (How to
> access side inputs, access remote references, and support user state)
>
>
> On Mon, May 14, 2018 at 3:16 PM Kenneth Knowles <k...@google.com> wrote:
>
>>
>> On Mon, May 14, 2018 at 2:30 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Before you go on and update the user facing API, we should discuss the
>>> last point I made since the change your making will have limited usability
>>> since the portability effort won't realistically allow you to see such low
>>> level things like when processElement finished and supporting user state
>>> will be modeled using the following three operations: read (with
>>> continuation tokens), append (blind write), clear. It may be moot to
>>> discuss how to integrate Samza into the existing framework and should work
>>> towards being a portability based runner. There are more details here about
>>> supporting user state during pipeline execution:
>>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top level
>>> docs about portability are https://s.apache.org/beam-runner-api and
>>> https://s.apache.org/beam-fn-api.
>>>
>>> Do we really want to make a change to the user facing API for user state
>>> in the Java SDK when that code path will be phased out in exchange for
>>> using the portability APIs?
>>>
>>
>> To be clear, the user API will remain the same, but the runner's
>> implementation path will be phased out. So should we expand this discussion
>> to how the portability APIs enable the SDK and runner to collaborate to
>> achieve this use case? It seems like the interaction you need is that the
>> runner can tell that the SDK can closed the connection on the read(), and
>> the SDK needs to do so promptly, right?
>>
>> Kenn
>>
>>
>>
>>> On Mon, May 14, 2018 at 2:20 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>>
>>>> We discussed internally about the proposed approaches. Seems if the
>>>> State API can also expose another method to return a 
>>>> ReadableState,
>>>> it will cover our cases of iterating over a bigger-then-memory state, and
>>>> closing the underlying rocksDb snapshot immediately after the iterator is
>>>> fully consumed (most cases), and also safely close the rest of the
>>>> iterators after proessElement (minor cases). If this sounds good to you
>>>> guys, I

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-14 Thread Xinyu Liu
We discussed internally about the proposed approaches. Seems if the State
API can also expose another method to return a ReadableState, it
will cover our cases of iterating over a bigger-then-memory state, and
closing the underlying rocksDb snapshot immediately after the iterator is
fully consumed (most cases), and also safely close the rest of the
iterators after proessElement (minor cases). If this sounds good to you
guys, I am going to create a JIRA ticket for it and open a PR for it.
Thanks a lot for the discussions here.

The discussions about Async processing is very interesting, and I think
it's definitely worth its own thread. I believe we do need the support from
the Beam API/model so the users can take advantage of it (Besides Samza,
Flink also has an async operator that helps a lot in the Alibaba's use
cases). Yes, it will add complexity to the underlying framework, but it's
going to be great for the users to do remote IO. In practice we found it
actually avoids thread+locks issues, as Kenn mentioned above. I am not sure
whether this feature can be runner-specific support thing. I will probably
create another email thread for this discussion in the future and hopefully
I can move this forward.

Thanks,
Xinyu

On Mon, May 14, 2018 at 10:30 AM, Kenneth Knowles <k...@google.com> wrote:

>
>
> On Mon, May 14, 2018 at 9:44 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Users typically want to do that async operation and then produce output
>> with it. Adding asynchronous execution is difficult within the framework
>> because a lot of code is currently not needed to be thread safe and writing
>> code to be fast and handle asynchronous execution is quite difficult.
>> Adding async operations typically leads to code with too many
>> locks/synchronization blocks.
>>
>
> Just a small point here, as it is very unusual to use locks with futures.
> The essential innovation of futures is that it is a new way to program that
> avoids threads+locks style. In part, you get this for free from functional
> programming style, and on the other hand you reify asynchronous side
> effects as data dependencies.
>
> Kenn
>
>
>
>> Note that with the portability effort, the Runner won't have visibility
>> into such low level things like when an object is garbage collected and
>> supporting user state will be modeled using the following three operations:
>> read (with continuation tokens), append (blind write), clear. It may be
>> moot to discuss how to integrate Samza into the existing framework and
>> should work towards being a portability based runner. There are more
>> details here about supporting user state during pipeline execution:
>> https://s.apache.org/beam-fn-state-api-and-bundle-processing, top level
>> docs about portability are https://s.apache.org/beam-runner-api and
>> https://s.apache.org/beam-fn-api.
>>
>> On Sun, May 13, 2018 at 6:51 PM Reuven Lax <re...@google.com> wrote:
>>
>>> At least one API that has been discussed in the past, is to use Java 8
>>> CompletionStage. e.g.
>>>
>>>  new DoFn<InputT, OutputT>() {
>>> @ProcessElement
>>> public void process(@Element CompletionStage element, ...) {
>>>   element.thenApply(...)
>>> }
>>>   }
>>>
>>> The framework will automatically create the CompletionStage, and the
>>> process method can specify a pipeline of asynchronous operations to perform
>>> on the element. When all of them are done, the element will be marked as
>>> successfully processed.
>>>
>>> Reuven
>>>
>>> On Sun, May 13, 2018 at 11:36 AM Xinyu Liu <xinyuliu...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for all the pointers. I looked though the discussion over BEAM-2975
>>>> and BEAM-2980 about having snapshot or live views of iterable, and the
>>>> current semantics makes a lot of sense to me.  For your question: it does
>>>> not require an explicit snapshot when we create RocksDb iterator directly.
>>>> The iterator will read from an implicit snapshot as of the time the
>>>> iterator is created [1], and the snapshot will be released after the
>>>> iterator is closed. If we can have another method to return
>>>> ReadableState, we might be able to apply the auto-closing
>>>> approaches as we discussed and solve the problem here :).
>>>>
>>>> It's very interesting that you bring up the discussion about async API!
>>>> Async IO has been widely adopted here among our users: they use netty for
>>>> async calls with library named ParSeq [2] to help mana

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-13 Thread Xinyu Liu
Thanks for all the pointers. I looked though the discussion over BEAM-2975
and BEAM-2980 about having snapshot or live views of iterable, and the
current semantics makes a lot of sense to me.  For your question: it does
not require an explicit snapshot when we create RocksDb iterator directly.
The iterator will read from an implicit snapshot as of the time the
iterator is created [1], and the snapshot will be released after the
iterator is closed. If we can have another method to return
ReadableState, we might be able to apply the auto-closing
approaches as we discussed and solve the problem here :).

It's very interesting that you bring up the discussion about async API!
Async IO has been widely adopted here among our users: they use netty for
async calls with library named ParSeq [2] to help manage the calls. Samza
provides a primitive callback style API [3], in which the user will invoke
the callback after the remote calls are complete. Currently in a Samza job
our users use this API with the ParSeq lib for remote IO. Seems we might
have to do blocking calls (thus the poor resource utilization you
mentioned) when using Beam API for now. It'll be great if you can send a
few more details about the discussion about async API. I would like to add
our use case and help move this forward.

Thanks,
Xinyu

[1]: https://github.com/facebook/rocksdb/wiki/Iterator
[2]: https://github.com/linkedin/parseq
[3]:
https://samza.apache.org/learn/documentation/0.14/api/javadocs/org/apache/samza/task/AsyncStreamTask.html


On Sat, May 12, 2018 at 8:17 PM, Kenneth Knowles <k...@google.com> wrote:

> I don't have any further suggestions, but want to call out how this hits a
> lot of interesting points.
>
> The point about snapshotting is great. We have BEAM-2975 [1] and BEAM-2980
> [2] where we debated things a bit. I think the strongest case is for what
> you describe - it should be a snapshot. Perhaps they should both be closed
> as fixed...
>
> And you also bring up long blocking calls - we have also deliberately
> decided that long synchronous blocking calls in @ProcessElement can be
> embraced for simple programming and compensated with autoscaling smarts
> (e.g. expand the thread pool by noticing poor utilization). The alternative
> is a more future-istic API where the calls can be explicitly asynchronous.
> We've had some interesting dev@ list discussions about that, too.
>
> Is another possibility to perhaps have read() return a
> ReadableState instead? We could, of course, have two methods with
> different names, one for iterator one for snapshot iterable. But wouldn't
> the Iterator also require a snapshot? Doesn't a native RocksDb iterator
> require a snapshot to have well-defined contents? As you can tell, I don't
> know enough about RocksDb details to be sure of my suggestions.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-2980
> [2] https://issues.apache.org/jira/browse/BEAM-2975
>
> On Sat, May 12, 2018 at 2:58 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question
>> here, we discussed internally about releasing the underlying resources
>> after consuming the whole iterator. This probably covers quite a lot of use
>> cases. For some special cases that the user only consume part of the
>> iterator, Luke and Kenn's suggestion about releasing after processElement()
>> might work (I need to confirm about this with our use cases). So based on
>> what we discussed so far, we might have a good way to automatically close
>> an iterator for the store.
>>
>> There is another issue though: right now the state API returns an
>> iterable for entries(), keys() and values(), and we can create iterator
>> from it. From my understanding, the iterable holds a snapshot of the
>> underlying store. In case of rocksDb, it's going to be a db.snapshot().
>> Then when can we release the snapshot? It's not like iterator where we can
>> use some heuristics to automatically release it. The user can hold on to
>> the iterable and create iterators throughout the whole processElement().
>> But if we only close the iterable after processElement(), I am quite
>> concerned about the limitations this will bring. If the user is doing some
>> remote call during the process, then the snapshot might be held for a long
>> time before releasing, and might cause performance problems. And if the
>> user happen to create multiple iterables, then there will be multiple
>> snapshots loaded during process. Luke suggested being aggressive at closing
>> the resources and recreating when needed again. But in this case it might
>> not work since we won't be able to recreate the same snapshot given the
>> store might have been u

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-12 Thread Xinyu Liu
Thanks for the ideas, Kenn, Luke and Eugene. Before I posted the question
here, we discussed internally about releasing the underlying resources
after consuming the whole iterator. This probably covers quite a lot of use
cases. For some special cases that the user only consume part of the
iterator, Luke and Kenn's suggestion about releasing after processElement()
might work (I need to confirm about this with our use cases). So based on
what we discussed so far, we might have a good way to automatically close
an iterator for the store.

There is another issue though: right now the state API returns an iterable
for entries(), keys() and values(), and we can create iterator from it.
>From my understanding, the iterable holds a snapshot of the underlying
store. In case of rocksDb, it's going to be a db.snapshot(). Then when can
we release the snapshot? It's not like iterator where we can use some
heuristics to automatically release it. The user can hold on to the
iterable and create iterators throughout the whole processElement(). But if
we only close the iterable after processElement(), I am quite concerned
about the limitations this will bring. If the user is doing some remote
call during the process, then the snapshot might be held for a long time
before releasing, and might cause performance problems. And if the user
happen to create multiple iterables, then there will be multiple snapshots
loaded during process. Luke suggested being aggressive at closing the
resources and recreating when needed again. But in this case it might not
work since we won't be able to recreate the same snapshot given the store
might have been updated (and creating rocksDb snapshot is not cheap too). I
am running out of ideas other than exposing the iterator itself somehow
(and add close() if needed?). Any further suggestions?

@Kenn: btw, I have the same impl you posted earlier (CloseableIterator) in
an internal interface. I wrapped it in some sort of StateResource in the
fear that people might reject the proposal immediately after seeing the
close() on the iterator. I guess our users are familiar with rocksDb state,
it's pretty normal to close the iterator/snapshot after using it.

Thanks,
Xinyu

On Fri, May 11, 2018 at 11:08 AM, Lukasz Cwik <lc...@google.com> wrote:

> The iterator going out of scope is the idiomatic way that resources are
> freed for Java developers (hence the weak/phantom reference suggestion).
> Explicitly requiring users to deal with 'handles' (like file streams) lead
> to leaked resources.
>
> On Fri, May 11, 2018 at 10:55 AM Kenneth Knowles <k...@google.com> wrote:
>
>> Thanks Xinyu,
>>
>> I actually had first sketched out just what you wrote. But then I
>> realized a few things:
>>
>>  - usually an Iterable does not allocate resources, only its Iterators
>>  - if you consume the whole iterator, I hope the user would not have to
>> do any extra work
>>  - you can also automatically free it up at the end of the call to
>> @ProcessElement so that is easy too (but you might not want to)
>>  - so the main use is when the iterator terminates early and is not fully
>> consumed and you can't wait to finish the method
>>  - the scoping in Java will force a bunch of uninitialized declarations
>> outside the try-with-resources block, kind of a lot of boilerplate LOC
>>
>> One thing that is good about your proposal is that the iterable could
>> have some transparent caches that all free up together.
>>
>> Kenn
>>
>> On Fri, May 11, 2018 at 9:51 AM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>
>>> Thanks for drafting the details about the two approaches, Kenn. Now I
>>> understand Luke's proposal better. The approach looks neat, but the
>>> uncertainty of *when* GC is going to kick in will make users' life
>>> hard. If the user happens to configure a large JVM heap size, and since
>>> rocksDb uses off-heap memory, GC might start very late and less frequent
>>> than what we want. If we don't have a *definitive* way to let user
>>> close the underlying resources, then there is no good way to handle such
>>> failures of a critical application in production.
>>>
>>> Having a close method in the iterator might be a little unorthodox :).
>>> To some degree, this is actually a resource we are holding underneath, and
>>> I think it's pretty common to have close() for a resource in Java, e.g.
>>> BufferedReader and BufferedWriter. So I would imagine that we also define a
>>> resource for the state iterator and make the interface implements
>>> *AutoCloseable*. Here is my sketch:
>>>
>>> // StateResource MUST be closed after use.
>>> try (StateResource<Iterator>> st =
>>> bagSt

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-11 Thread Xinyu Liu
Thanks for drafting the details about the two approaches, Kenn. Now I
understand Luke's proposal better. The approach looks neat, but the
uncertainty of *when* GC is going to kick in will make users' life hard. If
the user happens to configure a large JVM heap size, and since rocksDb uses
off-heap memory, GC might start very late and less frequent than what we
want. If we don't have a *definitive* way to let user close the underlying
resources, then there is no good way to handle such failures of a critical
application in production.

Having a close method in the iterator might be a little unorthodox :). To
some degree, this is actually a resource we are holding underneath, and I
think it's pretty common to have close() for a resource in Java, e.g.
BufferedReader and BufferedWriter. So I would imagine that we also define a
resource for the state iterator and make the interface implements
*AutoCloseable*. Here is my sketch:

// StateResource MUST be closed after use.
try (StateResource<Iterator>> st = bagState.iteratorResource()) {
Iterator iter = st.iterator();
while (iter.hasNext() {
   .. do stuff ...
}
} catch (Exception e) {
... user code
}

The type/method name are just for illustrating here, so please don't laugh
at them. Please feel free to comment and let me know if you have thoughts
about the programming patterns here.

Thanks,
Xinyu

On Thu, May 10, 2018 at 8:59 PM, Kenneth Knowles <k...@google.com> wrote:

> It is too soon to argue whether an API is complex or not. There has been
> no specific API proposed.
>
> I think the problem statement is real - you need to be able to read and
> write bigger-than-memory state. It seems we have multiple runners that
> don't support it, perhaps because of our API. You might be able to build
> something good enough with phantom references, but you might not.
>
> If I understand the idea, it might look something like this:
>
> new DoFn<>() {
>@StateId("foo")
>private final StateSpec<BagState> myBagSpec = ...
>
>@ProcessElement
>public void proc(@StateId("foo") BagState myBag, ...) {
>  CloseableIterator iterator = myBag.get().iterator();
>  while(iterator.hasNext() && ... special condition ...) {
>... do stuff ...
>  }
>  iterator.close();
>}
>  }
>
> So it has no impact on users who don't choose to close() since they
> iterate with for ( : ) as usual. And a runner that has the 10x funding to
> try out a ReferenceQueue can be resilient to users that forget. On the
> other hand, I haven't seen this pattern much in the wild, so I think it is
> valuable to discuss other methods.
>
> While Luke's proposal is something like this if I understand his sketch
> (replacing WeakReference with PhantomReference seems to be what you really
> want):
>
> ... in RocksDb state implementation ...
> class RocksDbBagState {
>   static ReferenceQueue rocksDbIteratorQueue = new ReferenceQueue();
>
>   class Iterator {
>  PhantomReference cIter;
>  .next() {
>return cIter.next();
>  }
>   }
>
>  class Iterable {
> .iterator() {
>   return new Iterator(new PhantomReference<>(rocksDbJniIterator,
> rocksDbIteratorQueue));
> }
>   }
> }
>
> ... on another thread ...
> while(true) {
>   RocksDbIterator deadRef = (RocksDbIterator)
> rocksDbIteratorQueue.remove();
>   deadRef.close();
> }
>
> When the iterator is GC'd, the phantom reference will pop onto the queue
> for being closed. This might not be too bad. You'll have delayed resource
> release, and potentially masked errors that are hard to debug. It is less
> error-prone than WeakReference, which is asking for trouble when objects
> are collected en masse. Anecdotally I have heard that performance of this
> kind of approach is poor, but I haven't experienced it myself and I can't
> find good data.
>
> Kenn
>
>
>
> On Thu, May 10, 2018 at 7:41 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> If I understand correctly, using weak references will help clean up the
>> Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java
>> iterator is just a JNI interface to the underlying C iterator, so we need
>> to explicitly invoke close to release the in-memory snapshot data, which
>> can be large and accumulated quickly if it's not released when not in use.
>> Maybe I am missing something as you suggested here, but looks to me using
>> weak references might not help in this case.
>>
>> I understand your concern, and I think you might misunderstood what I
>> mean

Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Xinyu Liu
If I understand correctly, using weak references will help clean up the
Java objects once GC kicks in. In case of kv-store likes rocksDb, the Java
iterator is just a JNI interface to the underlying C iterator, so we need
to explicitly invoke close to release the in-memory snapshot data, which
can be large and accumulated quickly if it's not released when not in use.
Maybe I am missing something as you suggested here, but looks to me using
weak references might not help in this case.

I understand your concern, and I think you might misunderstood what I
meant. I am totally for working hard for best user experience, and I think
the current API provides a good example of that. That's also the reason I
am implementing a runner here. I am just proposing an extra API to expose
an iterator that can be closed when not needed, that way the users can use
this feature to iterate through large state that doesn't fit into memory. I
believe this is also a pretty general use case and it's better to have
support for it. I am actually arguing this will be a better user experience
to add this extra API since more users can benefit from it.

Thanks,
Xinyu

On Thu, May 10, 2018 at 5:25 PM, Lukasz Cwik <lc...@google.com> wrote:

> I don't agree. I believe you can track the iterators/iterables that are
> created and freed by using weak references and reference queues (or other
> methods). Having a few people work 10x as hard to provide a good
> implementation is much better then having 100s or 1000s of users suffering
> through a more complicated API.
>
> On Thu, May 10, 2018 at 3:44 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> Load/evict blocks will help reduce the cache memory footprint, but we
>> still won't be able to release the underlying resources. We can add
>> definitely heuristics to help release the resources as you mentioned, but
>> there is no accurate way to track all the iterators/iterables created and
>> free them up once not needed. I think while the API is aimed at nice user
>> experience, we should have the option to let users optimize their
>> performance if they choose to. Do you agree?
>>
>> Thanks,
>> Xinyu
>>
>> On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Users won't reliably close/release the resources and forcing them to
>>> will make the user experience worse.
>>> It will make a lot more sense to use a file format which allows random
>>> access and use a cache to load/evict blocks of the state from memory.
>>> If that is not possible, use an iterable which frees the resource after
>>> a certain amount of inactivity or uses weak references.
>>>
>>> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>>>
>>>> Hi, folks,
>>>>
>>>> I'm in the middle of implementing the MapState and SetState in our
>>>> Samza runner. We noticed that the state returns the Java Iterable for
>>>> reading entries, keys, etc. For state backed by file-based kv store like
>>>> rocksDb, we need to be able to let users explicitly close iterator/iterable
>>>> to release the resources.Otherwise we have to load the iterable into memory
>>>> so we can safely close the underlying rocksDb iterator, similar to Flink's
>>>> implementation. But this won't work for states that don't fit into
>>>> memory. I chatted with Kenn and he also agrees we need this capability to
>>>> avoid bulk read/write. This seems to be a general use case and I'm
>>>> wondering if we can add the support to it? I am happy to contribute to this
>>>> if needed. Any feedback is highly appreciated.
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>
>>


Re: Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Xinyu Liu
Load/evict blocks will help reduce the cache memory footprint, but we still
won't be able to release the underlying resources. We can add definitely
heuristics to help release the resources as you mentioned, but there is no
accurate way to track all the iterators/iterables created and free them up
once not needed. I think while the API is aimed at nice user experience, we
should have the option to let users optimize their performance if they
choose to. Do you agree?

Thanks,
Xinyu

On Thu, May 10, 2018 at 3:25 PM, Lukasz Cwik <lc...@google.com> wrote:

> Users won't reliably close/release the resources and forcing them to will
> make the user experience worse.
> It will make a lot more sense to use a file format which allows random
> access and use a cache to load/evict blocks of the state from memory.
> If that is not possible, use an iterable which frees the resource after a
> certain amount of inactivity or uses weak references.
>
> On Thu, May 10, 2018 at 3:07 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
>> Hi, folks,
>>
>> I'm in the middle of implementing the MapState and SetState in our Samza
>> runner. We noticed that the state returns the Java Iterable for reading
>> entries, keys, etc. For state backed by file-based kv store like rocksDb,
>> we need to be able to let users explicitly close iterator/iterable to
>> release the resources.Otherwise we have to load the iterable into memory so
>> we can safely close the underlying rocksDb iterator, similar to Flink's
>> implementation. But this won't work for states that don't fit into
>> memory. I chatted with Kenn and he also agrees we need this capability to
>> avoid bulk read/write. This seems to be a general use case and I'm
>> wondering if we can add the support to it? I am happy to contribute to this
>> if needed. Any feedback is highly appreciated.
>>
>> Thanks,
>> Xinyu
>>
>


Support close of the iterator/iterable created from MapState/SetState

2018-05-10 Thread Xinyu Liu
Hi, folks,

I'm in the middle of implementing the MapState and SetState in our Samza
runner. We noticed that the state returns the Java Iterable for reading
entries, keys, etc. For state backed by file-based kv store like rocksDb,
we need to be able to let users explicitly close iterator/iterable to
release the resources.Otherwise we have to load the iterable into memory so
we can safely close the underlying rocksDb iterator, similar to Flink's
implementation. But this won't work for states that don't fit into memory.
I chatted with Kenn and he also agrees we need this capability to avoid
bulk read/write. This seems to be a general use case and I'm wondering if
we can add the support to it? I am happy to contribute to this if needed.
Any feedback is highly appreciated.

Thanks,
Xinyu


Re: Support non-keyed stateful ParDo

2018-04-25 Thread Xinyu Liu
@Robert: for your questions:

1) Side input won't work for us since it returns the whole collection. We
use rocksDb and usually the state is too big to fit in memory.

2) One way to achieve our use cases is to assign a single key to all the
elements so they will be associated with the same keyed state. The state
will belong to the element window as it is. Kenneth mentioned this solution
too. It does meet our use case, but it's not very convenient to our users.

3) Sorry if I wasn't clear about the use case. For our usage, it's pretty
common to store the elements in the states, and look them up later and do
some computation. The elements will be in the same window, but doesn't need
to be of the same key.

Thanks,
Xinyu

On Wed, Apr 25, 2018 at 6:02 PM, Robert Bradshaw <rober...@google.com>
wrote:

> On Wed, Apr 25, 2018 at 5:45 PM Xinyu Liu <xinyuliu...@gmail.com> wrote:
>
> > Hi,
>
> > I am working on adding the stateful ParDo to the upcoming BEAM Samza
> runner, and realized that the state for each ParDo processElement() is not
> only associated with the window of the element, but also the key of the
> element. Chatted with Kenneth over email about this design decision, which
> has the following benefits for keyed state:
>
> > 1) No synchronization
> > 2) Simple programming model
> > 3) No communication between works
>
> > The current design doesn't support accessing the state across different
> keys, which seems to be a more general use case. This use case is also very
> common inside LinkedIn where the users have access to the entire state of
> an operator/task, and performing lookups and computations on top of it.
> It's quite hard to make every user here aware that the state is also
> tightly associated with key of the element..
>
> Would side inputs be applicable here? (They're read-only, but other than
> that seem to fit the need.)
>
> >  From the stateful ParDo API the state looks pretty general too. I am
> wondering is it possible to extend the current API to support both keyed
> and non-keyed state? Even internally BEAM assigns a dummy key for to
> associate the state with all the elements. It will be very beneficial to
> existing Samza users and help them adopt BEAM.
>
> Could you clarify how you would use this dummy key? You could manually add
> a random key, but in that case it's unlikely that any state stored would
> get observed again. Across what scope were you thinking state would be
> stored? The lifetime of the bundle? The worker? The job? How are
> conflicting writes resolved?
>
> Perhaps rather than describing the mechanism (state) that you're trying to
> use, it'd be helpful to describe the kinds of computations you're trying to
> perform, to figure out how the model should be adapted/extended if it
> doesn't meet those needs.
>


Support non-keyed stateful ParDo

2018-04-25 Thread Xinyu Liu
Hi,

I am working on adding the stateful ParDo to the upcoming BEAM Samza
runner, and realized that the state for each ParDo processElement() is not
only associated with the window of the element, but also the key of the
element. Chatted with Kenneth over email about this design decision, which
has the following benefits for keyed state:

1) No synchronization
2) Simple programming model
3) No communication between works

The current design doesn't support accessing the state across different
keys, which seems to be a more general use case. This use case is also very
common inside LinkedIn where the users have access to the entire state of
an operator/task, and performing lookups and computations on top of it.
It's quite hard to make every user here aware that the state is also
tightly associated with key of the element.. From the stateful ParDo API
the state looks pretty general too. I am wondering is it possible to extend
the current API to support both keyed and non-keyed state? Even internally
BEAM assigns a dummy key for to associate the state with all the elements.
It will be very beneficial to existing Samza users and help them adopt BEAM.

Thanks,
Xinyu


Re: ***UNCHECKED*** Re: Samza Runner

2018-01-31 Thread xinyu liu
Thanks Kenneth to merge the Samza BEAM runner to the feature branch! We
will work on the other items (docs, example, capability matrix ..) to get
it to the master.

Thanks,
Xinyu

On Fri, Jan 26, 2018 at 9:28 AM, Kenneth Knowles  wrote:

> Regarding merging directly to master, I agree that the code itself is
> probably just as OK as our other runners were when they joined master. So
> we should watch the rest of the bits from https://beam.apache.org/
> contribute/feature-branches/ here is how I think things are:
>
>  [ ] Have at least 2 contributors interested in maintaining it, and 1
> committer interested in supporting it
>  [ ] Provide both end-user and developer-facing documentation
>  [x] Have at least a basic level of unit test coverage
>  [x] Run all existing applicable integration tests with other Beam
> components and create additional tests as appropriate
>  [~] Be able to handle a subset of the model that addresses a significant
> set of use cases, such as ‘traditional batch’ or ‘processing time
> streaming’.
>  [ ] Update the capability matrix with the current status
>  [ ] Add a webpage under documentation/runners
>
> So I would like to merge to the feature branch while we get docs together,
> identify interested contributors, and make sure of the integration with the
> build/test system, and maybe run through manual verification of the
> streaming game example at medium scale (anything bigger than local
> one-node).
>
> I also want to get it on a feature branch for this reason: we could
> continue to perfect it on the PR, but it is easier to have separate reviews
> for different issues identified on the big PR.
>
> Kenn
>
> On Thu, Jan 25, 2018 at 11:32 PM, Jean-Baptiste Onofré 
> wrote:
>
>> That's awesome ! Happy to see new runner.
>>
>> As the build is OK and the runner contains validation, why not simply
>> merge the
>> PR on master once 2.3.0 release branch is there ?
>>
>> It would give better visibility to this new feature and maybe attract
>> contribution in early stage.
>>
>> Regards
>> JB
>>
>> On 01/26/2018 05:37 AM, Kenneth Knowles wrote:
>> > Hi all,
>> >
>> > In case you haven't noticed or followed, there's a new runner in PR:
>> Samza!
>> >
>> > https://github.com/apache/beam/pull/4340
>> >
>> > It has been under review and revision for some time. In local mode it
>> passes a
>> > solid suite of ValidatesRunner tests (I don't have a Samza deployment
>> handy to
>> > test non-local).
>> >
>> > Given all this, I am ready to put it on a feature branch where it can
>> mature
>> > further, and we can build out our CI for it, etc, until we agree it is
>> ready for
>> > master.
>> >
>> > Kenn
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>