Re: Python Beam pipelines on Flink on Kubernetes

2019-07-24 Thread Pablo Estrada
I am very happy to see this. I'll take a look, and leave my comments.

I think this is something we'd been needing, and it's great that you guys
are putting thought into it. Thanks!<3

On Wed, Jul 24, 2019 at 9:01 PM Thomas Weise  wrote:

> Hi,
>
> Recently Lyft open sourced *FlinkK8sOperator,* a Kubernetes operator to
> manage Flink deployments on Kubernetes:
>
> https://github.com/lyft/flinkk8soperator/
>
> We are now discussing how to extend this operator to also support
> deployment of Python Beam pipelines with the Flink runner. I would like to
> share the proposal with the Beam community to enlist feedback as well as
> explore opportunities for collaboration:
>
>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/
>
> Looking forward to your comments and suggestions!
>
> Thomas
>
>


Python Beam pipelines on Flink on Kubernetes

2019-07-24 Thread Thomas Weise
Hi,

Recently Lyft open sourced *FlinkK8sOperator,* a Kubernetes operator to
manage Flink deployments on Kubernetes:

https://github.com/lyft/flinkk8soperator/

We are now discussing how to extend this operator to also support
deployment of Python Beam pipelines with the Flink runner. I would like to
share the proposal with the Beam community to enlist feedback as well as
explore opportunities for collaboration:

https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/

Looking forward to your comments and suggestions!

Thomas


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-24 Thread Kenneth Knowles
I think the best way to approach this is probably to have an example SQL
statement and to discuss what the relational semantics should be.

Windowing is not really part of SQL (yet) and in a way it just needs very
minimal extensions. See https://arxiv.org/abs/1905.12133. In this proposal
for SQL, windowed aggregation is explicitly be part of the GROUP BY
operation, where you GROUP BY window columns that were added. So it is more
explicit than in Beam. Relations do not have a WindowFn so there is no
problem of them being incompatible.

With Beam SQL there are basically two ways of windowing that work totally
differently:

1. SQL style windowing where you GROUP BY windows. This does not use the
input PCollection windowfn
2. PCollection windowing where the SQL does not do any windowing - this
should apply the SQL expression to each window independently

In order to support a hybrid of these, it might be:

3. SQL style windowing, where when a PCollection has window assigned, the
window columns are added before the SQL is applied. It is a bit strange but
might enable your use.

Kenn

On Mon, Jul 22, 2019 at 10:39 AM rahul patwari 
wrote:

> Hi,
>
> Beam currently doesn't support Join of Unbounded PCollections of different
> WindowFns (
> https://beam.apache.org/documentation/programming-guide/#groupbykey-and-unbounded-pcollections
> ).
>
> BeamSql performs [Unbounded PCollection] JOIN [Bounded PCollection], by
> performing 'SideInputJoin' with Bounded PCollection as a SideInput.
>
> Can we support [Unbounded PCollection] JOIN [Unbounded PCollection], when
> one of the Unbounded PCollection has [GlobalWindows Applied with
> Non-Default Trigger(probably a slow-changing lookup cache
> https://beam.apache.org/documentation/patterns/side-input-patterns/#slowly-updating-global-window-side-inputs)]
> by performing 'SideInputJoin'?
>
> Regards,
> Rahul
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-24 Thread Thomas Weise
Hi Jincheng,

It is very exciting to see this follow-up, that you have done your research
on the current state and that there is the intention to join forces on the
portability effort!

I have added a few pointers inline.

Several of the issues you identified affect our usage of Beam as well.
These present an opportunity for collaboration.

Thanks,
Thomas


On Wed, Jul 24, 2019 at 2:53 AM jincheng sun 
wrote:

> Hi all,
>
> Thanks Max and all of your kind words. :)
>
> Sorry for the late reply as I'm busy working on the Flink 1.9 release. For
> the next major release of Flink, we plan to add Python user defined
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
> portability framework and think that it is perfect for our requirements.
> However we also find some improvements needed for Beam:
>
> Must Have:
> 
> 1) Currently only BagState is supported in gRPC protocol and I think we
> should support more kinds of state types, such as MapState, ValueState,
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's
> because these kinds of state will be used in both user-defined function or
> Flink Python DataStream API.
>

There has been discussion about the need for different state types and to
efficiently support those on the runner side there may be a need to look at
the over the wire representation also.

https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E


2) There are warnings that Python 3 is not fully supported in Beam
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
> portability framework due to Python 2 will be not supported officially.
>

This must be obsolete per latest comments on:
https://issues.apache.org/jira/browse/BEAM-1251


>
> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory
> at the runner side. Why I think it's  must to have is because when the
> environment type is "PROCESS", the default value "/tmp" may become a big
> problem.
>
> 4) The buffer size configure policy should be improved, such as:
>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver
> is size based. We should also support time based especially for the
> streaming case.
>At Python SDK Harness, the buffer size is not configurable in
> GrpcDataService. The input queue size of the input buffer in Python SDK
> Harness is not size limited.
>   The flush threshold of the output buffer in Python SDK Harness is 10 MB
> by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
> threshold configurable and support time based threshold.
>
> Nice To Have:
> ---
> 1) Improves the interfaces of FnDataService, BundleProcessor,
> ActiveBundle, etc, to change the parameter type from WindowedValue to T.
> (We have already discussed in the previous mails)
>
> 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> It means maybe we can add a new module such as beam-sdks-java-common to
> hold the classes used by both runner and SDK.
>
> 3) State cache is not shared between bundles which is performance critical
> for streaming jobs.
>

This is rather important to address:

https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E


>
> 4) The coder of WindowedValue cannot be configured and most of time we
> don't need to serialize and deserialize the timestamp, window and pane
> properties in Flink. But currently FullWindowedValueCoder is used by
> default in WireCoders.addWireCoder, I suggest to make the coder
> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>
> 5) Currently if a coder is not defined in StandardCoders, it will be
> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
> coders are defined in StandardCoders. It means that for most coder, a
> length will be added to the serialized bytes which is not necessary in my
> thoughts. My suggestion is maybe we can add some interfaces or tags for the
> coder which indicate whether the coder is needed a length prefix or not.
>
> 6) Set log level according to PipelineOption in Python SDK Harness.
> Currently the log level is set to INFO by default.
>

https://issues.apache.org/jira/browse/BEAM-5468


>
> 7) Allows to start up StatusServer according to PipelineOption in Python
> SDK Harness. Currently the StatusServer is start up by default.
>
> Although I put 3) 4) 5) 

Re: contributor permission for Beam Jira tickets

2019-07-24 Thread Ahmet Altay
Welcome. I added ningk@ as a JIRA contributor.

On Wed, Jul 24, 2019 at 4:19 PM Ning Kang  wrote:

> Bump this thread as a friendly ping!
>
> On Tue, Jul 16, 2019 at 5:08 PM Ning Kang  wrote:
>
>> Hi,
>>
>> This is Ning Kang from Google. I'm working on the interactive beam
>> .
>> Could someone please add me as a contributor for Beam's Jira issue tracker?
>> I would like to create/assign tickets for my work.
>>
>> Thanks!
>> Ning.
>>
>


Re: contributor permission for Beam Jira tickets

2019-07-24 Thread Ning Kang
Bump this thread as a friendly ping!

On Tue, Jul 16, 2019 at 5:08 PM Ning Kang  wrote:

> Hi,
>
> This is Ning Kang from Google. I'm working on the interactive beam
> .
> Could someone please add me as a contributor for Beam's Jira issue tracker?
> I would like to create/assign tickets for my work.
>
> Thanks!
> Ning.
>


Re: Sort Merge Bucket - Action Items

2019-07-24 Thread Kenneth Knowles
>From the peanut gallery, keeping a separate implementation for SMB seems
fine. Dependencies are serious liabilities for both upstream and
downstream. It seems like the reuse angle is generating extra work, and
potentially making already-complex implementations more complex, instead of
helping things.

Kenn

On Wed, Jul 24, 2019 at 11:59 AM Neville Li  wrote:

> I spoke too soon. Turns out for unsharded writes, numShards can't be
> determined until the last finalize transform, which is again different from
> the current SMB proposal (static number of buckets & shards).
> I'll end up with more code specialized for SMB in order to generalize
> existing sink code, which I think we all want to avoid.
>
> Seems the only option is duplicating some logic like temp file handling,
> which is exactly what we did in the original PR.
> I can reuse Compression & Sink for file level writes but that seems
> about the most I can reuse right now.
>
> On Tue, Jul 23, 2019 at 6:36 PM Neville Li  wrote:
>
>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>>
>> WriteShardsIntoTempFilesFn extends DoFn*,
>> Iterable>, *FileResult*>
>> => GatherResults extends PTransform,
>> PCollection>>
>> => FinalizeTempFileBundles extends PTransform> *FileResult*>>, WriteFilesResult>
>>
>> I replaced FileResult with KV so
>> I can use pre-compute SMB destination file names for the transforms.
>> I'm also thinking of parameterizing ShardedKey for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>>
>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>>
>> Let me know if I'm on the wrong track.
>>
>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>
>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath 
>> wrote:
>>
>>>
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw 
>>> wrote:
>>>
 On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov 
 wrote:
 >
 > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw 
 wrote:
 >>
 >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li 
 wrote:
 >> >
 >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
 see what needs to be done.
 >> >
 >> > Eugene pointed out that we shouldn't build on
 FileBased{Source,Sink}. So for writes I'll probably build on top of
 WriteFiles.
 >>
 >> Meaning it could be parameterized by FileIO.Sink, right?
 >>
 >>
 https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
 >
 > Yeah if possible, parameterize FileIO.Sink.
 > I would recommend against building on top of WriteFiles either.
 FileIO being implemented on top of WriteFiles was supposed to be a
 temporary measure - the longer-term plan was to rewrite it from scratch
 (albeit with a similar structure) and throw away WriteFiles.
 > If possible, I would recommend to pursue this path: if there are
 parts of WriteFiles you want to reuse, I would recommend to implement them
 as new transforms, not at all tied to FileBasedSink (but ok if tied to
 FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
 of these new transforms, or maybe parts of WriteFiles could be swapped out
 for them incrementally.

 Thanks for the feedback. There's a lot that was done, but looking at
 the code it feels like there's a lot that was not yet done either, and
 the longer-term plan wasn't clear (though perhaps I'm just not finding
 the right docs).

>>>
>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>>> updating source interfaces, but I prefer not significantly modifying
>>> existing IO transforms to suite the SMB use-case. If there are existing
>>> pieces of code that can be easily re-used that is fine, but existing
>>> sources/sinks are designed to perform a PCollection -> file transformation
>>> and vice versa with (usually) runner determined sharding. Things specific
>>> to SMB such as sharding restrictions, writing metadata to a separate file,
>>> reading multiple files from the same abstraction, does not sound like
>>> features that should be included in our usual file read/write transforms.
>>>
>>>
 >> > Read might be a bigger change w.r.t. collocating ordered elements
 across files within a bucket and TBH I'm not even sure where to start.
 >>
 >> Yeah, here we need an interface that gives us ReadableFile ->
 >> Iterable. There are existing PTransform,
 >> PCollection> but such an interface is insufficient to extract
 >> ordered records per shard. It seems 

Re: [PROPOSAL] Revised streaming extensions for Beam SQL

2019-07-24 Thread Mingmin Xu
+1 to remove those magic words in Calcite streaming SQL, just because
they're not SQL standard. The idea to replace HOP/TUMBLE with
table-view-functions makes it concise, my only question is, is it(or will
it be) part of SQL standard? --I'm a big fan to align with standards :lol

Ps, although the concept of `window` used here are different from window
function in SQL, the syntax gives some insight. Take the example of
`ROW_NUMBER()
OVER (PARTITION BY COL1 ORDER BY COL2) AS row_number`, `ROW_NUMBER()`
assigns a sequence value for records in subgroup with key 'COL1'. We can
introduce another function, like TUMBLE() which will assign a window
instance(more instances for HOP()) for the record.

Mingmin


On Sun, Jul 21, 2019 at 9:42 PM Manu Zhang  wrote:

> Thanks Kenn,
> great paper and left some newbie questions on the proposal.
>
> Manu
>
> On Fri, Jul 19, 2019 at 1:51 AM Kenneth Knowles  wrote:
>
>> Hi all,
>>
>> I recently had the great privilege to work with others from Beam plus
>> Calcite and Flink SQL contributors to build a new and minimal proposal for
>> adding streaming extensions to standard SQL: event time, watermarks,
>> windowing, triggers, stream materialization.
>>
>> We hope this will influence the standard body and also Calcite and Flink
>> and other projects working on the streaming SQL.
>>
>> I would like to start implementing these extensions in Beam, moving from
>> our current streaming extensions to the new proposal.
>>
>>The whole paper is https://arxiv.org/abs/1905.12133
>>
>>My small proposal to start in Beam:
>> https://s.apache.org/streaming-beam-sql
>>
>> TL;DR: replace `GROUP BY Tumble/Hop/Session` with table functions that do
>> Tumble, Hop, Session. The details of why to make this change are explained
>> in the appendix to my proposal. For the big picture of how it fits in, the
>> full paper is best.
>>
>> Kenn
>>
>

-- 

Mingmin


Re: [Discuss] Retractions in Beam

2019-07-24 Thread Rui Wang
Hello!

In case you are not aware of, I have added a modified streaming wordcount
example at the end of the doc to illustrate retractions.


-Rui

On Wed, Jul 10, 2019 at 10:58 AM Rui Wang  wrote:

> Hi Community,
>
> Retractions is a part of core Beam model [1]. I come up with a doc to
> discuss retractions about use cases, model and API (see the link below).
> This is a very beginning discussion on retractions but I do hope we can
> have a consensus and make retractions implemented in a useful way
> eventually.
>
>
> doc link:
> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>
>
> [1]: https://issues.apache.org/jira/browse/BEAM-91
>
>
> -Rui
>


Re: How to expose/use the External transform on Java SDK

2019-07-24 Thread Heejong Lee
I think it depends how we define "the core" part of the SDK. If we define
the core as only the (abstract) data types which describe BEAM pipeline
model then it would be more sensible to put external transform into a
separate extension module (option 4). Otherwise, option 1 makes sense.

On Wed, Jul 24, 2019 at 11:56 AM Chamikara Jayalath 
wrote:

> The idea of 'ExternalTransform' is to allow users to use transforms in SDK
> X from SDK Y. I think this should be a core part of each SDK and
> corresponding external transforms ([a] for Java, [b] for Python) should be
> released with each SDK. This will also allow us to add core external
> transforms to some of the critical transforms that are not available in
> certain SDKs. So I prefer option (1).
>
> Rebo, I didn't realize there's an external transform in Go SDK. Looking at
> it, seems like it's more of an interface for native transforms implemented
> in each runner, not for cross-language use-cases. Is that correct ? May be
> we can reuse it for latter as well.
>
> Thanks,
> Cham
>
> [a]
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
> [b]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py
>
> On Wed, Jul 24, 2019 at 10:25 AM Robert Burke  wrote:
>
>> Ideas inline.
>>
>> On Wed, Jul 24, 2019, 9:56 AM Ismaël Mejía  wrote:
>>
>>> After Beam Summit EU I was curious about the External transform. I was
>>> interested on the scenario of using it to call python code in the
>>> middle of a Java pipeline. This is a potentially useful scenario for
>>> example to evaluate models from python ML frameworks on Java
>>> pipelines. In my example I did a transform to classify elements in a
>>> simple Python ParDo and tried to connect it via the Java External
>>> transform.
>>>
>>> I found that the ExternalTransform code was added into
>>> `runners/core-construction-java` as part of BEAM-6747 [1]. However
>>> this code is not exposed currently as part of the Beam Java SDK, so
>>> end users won’t be able to find it easily. I found this weird and
>>> thought well it will be as simple as to move it into the Java SDK and
>>> voila!
>>>
>>> But of course this could not be so easy because this transform calls
>>> the Expansion service via gRPC and Java SDK does not have (and
>>> probably should not have) gRPC in its dependencies.
>>> So my second reflex was to add it into Java SDK and translate it a
>>> generic expansion all the runners, but this may not make sense because
>>> the External transform is not part of the runner translation since
>>> this is part of the Pipeline construction process (as pointed to me by
>>> Max in a slack discussion).
>>>
>>> So the question is: How do you think this should be exposed to the end
>>> users?
>>>
>>> 1. Should we add gRPC with all its deps to SDKs Java core? (this of
>>> course it is not nice because we will leak our vendored gRPC and
>>> friends into users classpath).
>>>
>> If there's separation between the SDK and the Harness then this makes
>> sense. Otherwise the portable harness depends on GRPC at present, doesn't
>> it? Presently the Go SDK kicks off the harness, and then carries the GRPC
>> dependency (Though that's separable if necessary.)
>>
>>> 2. Should we do the dynamic loading of classes only an runtime if the
>>> transform is used to avoid the big extra compile dependency (and add
>>> runners/core-construction-java) as a runtime dependency.
>>> 3. Should we create a ‘shim’ module to hide the gRPC dependency and
>>> load the gRPC classes dynamically on it when the External transform is
>>> part of the pipeline.
>>> 4. Should we pack it as an extension (with the same issue of needing
>>> to leak the dependencies, but with less impact for users who do not
>>> use External) ?
>>> 5. Other?
>>>
>>> The ‘purist’ me thinks we should have External in sdks/java/core but
>>> maybe it is better not to. Any other opinions or ideas?
>>>
>>
>> The Go SDK supports External in it's core transforms set  However it
>> would be the callers are able to populate the data field however they need
>> to, whether that's some "known" configuration object or something sourced
>> from another service (eg the expansion service). The important part on the
>> other side is that the runner knows what to do with it.
>>
>> The non-portable pubsubio in the Go SDK is an example [1] using External
>> currently. The Dataflow runner recognizes it, and makes the substitution.
>> Eventually once the SDK supports SDF that can generate unbounded
>> PCollections, this will likely be replaced with that kind of
>> implementation, and the the existing "External" version will be moved to
>> part of the Go SDKs Dataflow runner package.
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/pubsubio/pubsubio.go#L65
>>
>>>
>>> Thanks,
>>> Ismaël
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-6747
>>>
>>

Re: Sort Merge Bucket - Action Items

2019-07-24 Thread Neville Li
I spoke too soon. Turns out for unsharded writes, numShards can't be
determined until the last finalize transform, which is again different from
the current SMB proposal (static number of buckets & shards).
I'll end up with more code specialized for SMB in order to generalize
existing sink code, which I think we all want to avoid.

Seems the only option is duplicating some logic like temp file handling,
which is exactly what we did in the original PR.
I can reuse Compression & Sink for file level writes but that seems
about the most I can reuse right now.

On Tue, Jul 23, 2019 at 6:36 PM Neville Li  wrote:

> So I spent one afternoon trying some ideas for reusing the last few
> transforms WriteFiles.
>
> WriteShardsIntoTempFilesFn extends DoFn*,
> Iterable>, *FileResult*>
> => GatherResults extends PTransform,
> PCollection>>
> => FinalizeTempFileBundles extends PTransform *FileResult*>>, WriteFilesResult>
>
> I replaced FileResult with KV so I
> can use pre-compute SMB destination file names for the transforms.
> I'm also thinking of parameterizing ShardedKey for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
>
> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
>
> Let me know if I'm on the wrong track.
>
> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>
> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath 
> wrote:
>
>>
>>
>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw 
>> wrote:
>>
>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov 
>>> wrote:
>>> >
>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li 
>>> wrote:
>>> >> >
>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
>>> see what needs to be done.
>>> >> >
>>> >> > Eugene pointed out that we shouldn't build on
>>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>>> WriteFiles.
>>> >>
>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>> >>
>>> >>
>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>> >
>>> > Yeah if possible, parameterize FileIO.Sink.
>>> > I would recommend against building on top of WriteFiles either. FileIO
>>> being implemented on top of WriteFiles was supposed to be a temporary
>>> measure - the longer-term plan was to rewrite it from scratch (albeit with
>>> a similar structure) and throw away WriteFiles.
>>> > If possible, I would recommend to pursue this path: if there are parts
>>> of WriteFiles you want to reuse, I would recommend to implement them as new
>>> transforms, not at all tied to FileBasedSink (but ok if tied to
>>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>>> for them incrementally.
>>>
>>> Thanks for the feedback. There's a lot that was done, but looking at
>>> the code it feels like there's a lot that was not yet done either, and
>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>> the right docs).
>>>
>>
>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>> updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>>
>>
>>> >> > Read might be a bigger change w.r.t. collocating ordered elements
>>> across files within a bucket and TBH I'm not even sure where to start.
>>> >>
>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>> >> Iterable. There are existing PTransform,
>>> >> PCollection> but such an interface is insufficient to extract
>>> >> ordered records per shard. It seems the only concrete implementations
>>> >> are based on FileBasedSource, which we'd like to avoid, but there's no
>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>> >> cumbersome to call (given the reflection machinery involved in
>>> >> invoking DoFns).
>>> >
>>> > Seems easiest to just define a new regular Java interface for this.
>>> > Could be either, indeed, ReadableFile -> Iterable, or something
>>> analogous, e.g. (ReadableFile, OutputReceiver) -> 

Re: How to expose/use the External transform on Java SDK

2019-07-24 Thread Chamikara Jayalath
The idea of 'ExternalTransform' is to allow users to use transforms in SDK
X from SDK Y. I think this should be a core part of each SDK and
corresponding external transforms ([a] for Java, [b] for Python) should be
released with each SDK. This will also allow us to add core external
transforms to some of the critical transforms that are not available in
certain SDKs. So I prefer option (1).

Rebo, I didn't realize there's an external transform in Go SDK. Looking at
it, seems like it's more of an interface for native transforms implemented
in each runner, not for cross-language use-cases. Is that correct ? May be
we can reuse it for latter as well.

Thanks,
Cham

[a]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
[b]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py

On Wed, Jul 24, 2019 at 10:25 AM Robert Burke  wrote:

> Ideas inline.
>
> On Wed, Jul 24, 2019, 9:56 AM Ismaël Mejía  wrote:
>
>> After Beam Summit EU I was curious about the External transform. I was
>> interested on the scenario of using it to call python code in the
>> middle of a Java pipeline. This is a potentially useful scenario for
>> example to evaluate models from python ML frameworks on Java
>> pipelines. In my example I did a transform to classify elements in a
>> simple Python ParDo and tried to connect it via the Java External
>> transform.
>>
>> I found that the ExternalTransform code was added into
>> `runners/core-construction-java` as part of BEAM-6747 [1]. However
>> this code is not exposed currently as part of the Beam Java SDK, so
>> end users won’t be able to find it easily. I found this weird and
>> thought well it will be as simple as to move it into the Java SDK and
>> voila!
>>
>> But of course this could not be so easy because this transform calls
>> the Expansion service via gRPC and Java SDK does not have (and
>> probably should not have) gRPC in its dependencies.
>> So my second reflex was to add it into Java SDK and translate it a
>> generic expansion all the runners, but this may not make sense because
>> the External transform is not part of the runner translation since
>> this is part of the Pipeline construction process (as pointed to me by
>> Max in a slack discussion).
>>
>> So the question is: How do you think this should be exposed to the end
>> users?
>>
>> 1. Should we add gRPC with all its deps to SDKs Java core? (this of
>> course it is not nice because we will leak our vendored gRPC and
>> friends into users classpath).
>>
> If there's separation between the SDK and the Harness then this makes
> sense. Otherwise the portable harness depends on GRPC at present, doesn't
> it? Presently the Go SDK kicks off the harness, and then carries the GRPC
> dependency (Though that's separable if necessary.)
>
>> 2. Should we do the dynamic loading of classes only an runtime if the
>> transform is used to avoid the big extra compile dependency (and add
>> runners/core-construction-java) as a runtime dependency.
>> 3. Should we create a ‘shim’ module to hide the gRPC dependency and
>> load the gRPC classes dynamically on it when the External transform is
>> part of the pipeline.
>> 4. Should we pack it as an extension (with the same issue of needing
>> to leak the dependencies, but with less impact for users who do not
>> use External) ?
>> 5. Other?
>>
>> The ‘purist’ me thinks we should have External in sdks/java/core but
>> maybe it is better not to. Any other opinions or ideas?
>>
>
> The Go SDK supports External in it's core transforms set  However it would
> be the callers are able to populate the data field however they need to,
> whether that's some "known" configuration object or something sourced from
> another service (eg the expansion service). The important part on the other
> side is that the runner knows what to do with it.
>
> The non-portable pubsubio in the Go SDK is an example [1] using External
> currently. The Dataflow runner recognizes it, and makes the substitution.
> Eventually once the SDK supports SDF that can generate unbounded
> PCollections, this will likely be replaced with that kind of
> implementation, and the the existing "External" version will be moved to
> part of the Go SDKs Dataflow runner package.
>
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/pubsubio/pubsubio.go#L65
>
>>
>> Thanks,
>> Ismaël
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-6747
>>
>


Talk Beam Go at GopherCon?

2019-07-24 Thread Robert Burke
If anyone wants to talk about the Apache Beam Go SDK, I'm at GopherCon in
San Diego this week. Please say hi to the moustached, blue haired gentleman
(me).

There's no official Beam content on the program, but why should that stop
us?

Robert Burke (@lostluck on Twitter)


Re: How to expose/use the External transform on Java SDK

2019-07-24 Thread Robert Burke
Ideas inline.

On Wed, Jul 24, 2019, 9:56 AM Ismaël Mejía  wrote:

> After Beam Summit EU I was curious about the External transform. I was
> interested on the scenario of using it to call python code in the
> middle of a Java pipeline. This is a potentially useful scenario for
> example to evaluate models from python ML frameworks on Java
> pipelines. In my example I did a transform to classify elements in a
> simple Python ParDo and tried to connect it via the Java External
> transform.
>
> I found that the ExternalTransform code was added into
> `runners/core-construction-java` as part of BEAM-6747 [1]. However
> this code is not exposed currently as part of the Beam Java SDK, so
> end users won’t be able to find it easily. I found this weird and
> thought well it will be as simple as to move it into the Java SDK and
> voila!
>
> But of course this could not be so easy because this transform calls
> the Expansion service via gRPC and Java SDK does not have (and
> probably should not have) gRPC in its dependencies.
> So my second reflex was to add it into Java SDK and translate it a
> generic expansion all the runners, but this may not make sense because
> the External transform is not part of the runner translation since
> this is part of the Pipeline construction process (as pointed to me by
> Max in a slack discussion).
>
> So the question is: How do you think this should be exposed to the end
> users?
>
> 1. Should we add gRPC with all its deps to SDKs Java core? (this of
> course it is not nice because we will leak our vendored gRPC and
> friends into users classpath).
>
If there's separation between the SDK and the Harness then this makes
sense. Otherwise the portable harness depends on GRPC at present, doesn't
it? Presently the Go SDK kicks off the harness, and then carries the GRPC
dependency (Though that's separable if necessary.)

> 2. Should we do the dynamic loading of classes only an runtime if the
> transform is used to avoid the big extra compile dependency (and add
> runners/core-construction-java) as a runtime dependency.
> 3. Should we create a ‘shim’ module to hide the gRPC dependency and
> load the gRPC classes dynamically on it when the External transform is
> part of the pipeline.
> 4. Should we pack it as an extension (with the same issue of needing
> to leak the dependencies, but with less impact for users who do not
> use External) ?
> 5. Other?
>
> The ‘purist’ me thinks we should have External in sdks/java/core but
> maybe it is better not to. Any other opinions or ideas?
>

The Go SDK supports External in it's core transforms set  However it would
be the callers are able to populate the data field however they need to,
whether that's some "known" configuration object or something sourced from
another service (eg the expansion service). The important part on the other
side is that the runner knows what to do with it.

The non-portable pubsubio in the Go SDK is an example [1] using External
currently. The Dataflow runner recognizes it, and makes the substitution.
Eventually once the SDK supports SDF that can generate unbounded
PCollections, this will likely be replaced with that kind of
implementation, and the the existing "External" version will be moved to
part of the Go SDKs Dataflow runner package.


[1]
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/pubsubio/pubsubio.go#L65

>
> Thanks,
> Ismaël
>
> [1] https://issues.apache.org/jira/browse/BEAM-6747
>


Re: [BEAM-7755] fixing how repeated records are handled

2019-07-24 Thread Rui Wang
Thanks for you contribution. Left a comment on the PR.


-Rui

On Wed, Jul 24, 2019 at 8:56 AM  wrote:

> Hello,
>
> I opened the ticket BEAM-7755, and also suggested a fix at
> https://github.com/apache/beam/pull/9089.
> I would like feedback or thoughts on this issue as we are looking to use
> BeamSQL and are testing it out on some use cases we have. This issue has
> been one of the bigger we have encountered and wanted to know if we are
> doing something wrong on our side or not. Thank you for your help!
>
> Thanks,
>
> Sahith
>


How to expose/use the External transform on Java SDK

2019-07-24 Thread Ismaël Mejía
After Beam Summit EU I was curious about the External transform. I was
interested on the scenario of using it to call python code in the
middle of a Java pipeline. This is a potentially useful scenario for
example to evaluate models from python ML frameworks on Java
pipelines. In my example I did a transform to classify elements in a
simple Python ParDo and tried to connect it via the Java External
transform.

I found that the ExternalTransform code was added into
`runners/core-construction-java` as part of BEAM-6747 [1]. However
this code is not exposed currently as part of the Beam Java SDK, so
end users won’t be able to find it easily. I found this weird and
thought well it will be as simple as to move it into the Java SDK and
voila!

But of course this could not be so easy because this transform calls
the Expansion service via gRPC and Java SDK does not have (and
probably should not have) gRPC in its dependencies.
So my second reflex was to add it into Java SDK and translate it a
generic expansion all the runners, but this may not make sense because
the External transform is not part of the runner translation since
this is part of the Pipeline construction process (as pointed to me by
Max in a slack discussion).

So the question is: How do you think this should be exposed to the end users?

1. Should we add gRPC with all its deps to SDKs Java core? (this of
course it is not nice because we will leak our vendored gRPC and
friends into users classpath).
2. Should we do the dynamic loading of classes only an runtime if the
transform is used to avoid the big extra compile dependency (and add
runners/core-construction-java) as a runtime dependency.
3. Should we create a ‘shim’ module to hide the gRPC dependency and
load the gRPC classes dynamically on it when the External transform is
part of the pipeline.
4. Should we pack it as an extension (with the same issue of needing
to leak the dependencies, but with less impact for users who do not
use External) ?
5. Other?

The ‘purist’ me thinks we should have External in sdks/java/core but
maybe it is better not to. Any other opinions or ideas?

Thanks,
Ismaël

[1] https://issues.apache.org/jira/browse/BEAM-6747


[BEAM-7755] fixing how repeated records are handled

2019-07-24 Thread sahith . reddy
Hello,

I opened the ticket BEAM-7755, and also suggested a fix at 
https://github.com/apache/beam/pull/9089. 
I would like feedback or thoughts on this issue as we are looking to use 
BeamSQL and are testing it out on some use cases we have. This issue has been 
one of the bigger we have encountered and wanted to know if we are doing 
something wrong on our side or not. Thank you for your help!

Thanks,

Sahith

Re: Choosing a coder for a class that contains a Row?

2019-07-24 Thread Ryan Skraba
I'm also really interested in the question of evolving schemas... It's
something I've also put off figuring out :D

With all its warts, the LazyAvroCoder technique (a coder backed by
some sort of schema registry) _could_ work with "homogeneish" data
(i.e. if the number of schemas in play for a single coder is much,
much smaller than the number of elements), even if none of the the
schemas are known at Pipeline construction.  The portability job
server (which already stores and serves artifacts for running jobs)
might be the right place to put a schema registry... but I'm not
entirely convinced it's the right way to go either.

At the same time, "simply" bumping a known schema to a new version is
roughly equivalent to updating a pipeline in place.

Sending the data as Java-serialized Rows will be equivalent to sending
the entire schema with every record, so it _would_ work without
involving a new, distributed state between one coders encode and
anothers decode (at the cost of message size, of course).

Ryan


On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada  wrote:
>
> +dev
> Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful.
>
> The data is change data capture from databases, and I'm putting it into a 
> Beam Row. The schema for the Row is generally homogeneous, but subject to 
> change at some point in the future if the schema in the database changes. 
> It's unusual and unlikely, but possible. I have no idea how Beam deals with 
> evolving schemas. +Reuven Lax is there documentation / examples / anything 
> around this? : )
>
> I think evolving schemas is an interesting question
>
> For now, I am going to Java-serialize the objects, and delay figuring this 
> out. But I reckon I'll have to come back to this...
>
> Best
> -P.
>
> On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba  wrote:
>>
>> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> pipeline construction time, but can be discovered from the instance of
>> MyData?
>>
>> Once discovered, is the schema "homogeneous" for all instance of
>> MyData?  (i.e. someRow will always have the same schema for all
>> instances afterwards, and there won't be another someRow with a
>> different schema).
>>
>> We've encountered a parallel "problem" with pure Avro data, where the
>> instance is a GenericRecord containing it's own Avro schema but
>> *without* knowing the schema until the pipeline is run.  The solution
>> that we've been using is a bit hacky, but we're using an ad hoc
>> per-job schema registry and a custom coder where each worker saves the
>> schema in the `encode` before writing the record, and loads it lazily
>> in the `decode` before reading.
>>
>> The original code is available[1] (be gentle, it was written with Beam
>> 0.4.0-incubating... and has continued to work until now).
>>
>> In practice, the ad hoc schema registry is just a server socket in the
>> Spark driver, in-memory for DirectRunner / local mode, and a a
>> read/write to a known location in other runners.  There are definitely
>> other solutions with side-inputs and providers, and the job server in
>> portability looks like an exciting candidate for per-job schema
>> registry story...
>>
>> I'm super eager to see if there are other ideas or a contribution we
>> can make in this area that's "Beam Row" oriented!
>>
>> Ryan
>>
>> [1] 
>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>>
>> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada  wrote:
>> >
>> > Hello all,
>> > I am writing a utility to push data to PubSub. My data class looks 
>> > something like so:
>> > ==
>> > class MyData {
>> >   String someId;
>> >   Row someRow;
>> >   Row someOtherRow;
>> > }
>> > ==
>> > The schema for the Rows is not known a-priori. It is contained by the Row. 
>> > I am then pushing this data to pubsub:
>> > ===
>> > MyData pushingData = 
>> > WhatCoder? coder = 
>> >
>> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> > coder.encode(this, os);
>> >
>> > pubsubClient.connect();
>> > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> > pubsubClient.close();
>> > =
>> > What's the right coder to use in this case? I don't know if SchemaCoder 
>> > will work, because it seems that it requires the Row's schema a priori. I 
>> > have not been able to make AvroCoder work.
>> >
>> > Any tips?
>> > Best
>> > -P.


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-24 Thread jincheng sun
Hi all,

Thanks Max and all of your kind words. :)

Sorry for the late reply as I'm busy working on the Flink 1.9 release. For
the next major release of Flink, we plan to add Python user defined
functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
portability framework and think that it is perfect for our requirements.
However we also find some improvements needed for Beam:

Must Have:

1) Currently only BagState is supported in gRPC protocol and I think we
should support more kinds of state types, such as MapState, ValueState,
ReducingState, CombiningState(AggregatingState in Flink), etc. That's
because these kinds of state will be used in both user-defined function or
Flink Python DataStream API.

2) There are warnings that Python 3 is not fully supported in Beam
(beam/sdks/python/setup.py). We should support Python 3.x for the beam
portability framework due to Python 2 will be not supported officially.

3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at
the runner side. Why I think it's  must to have is because when the
environment type is "PROCESS", the default value "/tmp" may become a big
problem.

4) The buffer size configure policy should be improved, such as:
   At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver
is size based. We should also support time based especially for the
streaming case.
   At Python SDK Harness, the buffer size is not configurable in
GrpcDataService. The input queue size of the input buffer in Python SDK
Harness is not size limited.
  The flush threshold of the output buffer in Python SDK Harness is 10 MB
by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
threshold configurable and support time based threshold.

Nice To Have:
---
1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle,
etc, to change the parameter type from WindowedValue to T. (We have
already discussed in the previous mails)

2) Refactor the code to avoid unnecessary dependencies pull in. For
example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
is pull in because there are a few classes in beam-sdks-java-core are used
in beam-runners-java-fn-execution, such as:
PipelineOptions used in DefaultJobBundleFactory FileSystems used in
BeamFileSystemArtifactRetrievalService.
It means maybe we can add a new module such as beam-sdks-java-common to
hold the classes used by both runner and SDK.

3) State cache is not shared between bundles which is performance critical
for streaming jobs.

4) The coder of WindowedValue cannot be configured and most of time we
don't need to serialize and deserialize the timestamp, window and pane
properties in Flink. But currently FullWindowedValueCoder is used by
default in WireCoders.addWireCoder, I suggest to make the coder
configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)

5) Currently if a coder is not defined in StandardCoders, it will be
wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
coders are defined in StandardCoders. It means that for most coder, a
length will be added to the serialized bytes which is not necessary in my
thoughts. My suggestion is maybe we can add some interfaces or tags for the
coder which indicate whether the coder is needed a length prefix or not.

6) Set log level according to PipelineOption in Python SDK Harness.
Currently the log level is set to INFO by default.

7) Allows to start up StatusServer according to PipelineOption in Python
SDK Harness. Currently the StatusServer is start up by default.

Although I put 3) 4) 5) into the "Nice to Have" as they are performance
related, I still think they are very critical for Python UDF execution
performance.

Open questions:
-
1) Which coders should be / can be defined in StandardCoders?

Currently we are preparing the design of how to support Python UDF in Flink
based on the Beam portability framework and we will bring up the discussion
in Flink community. We may propose more changes for Beam during that time
and may need more support from Beam community.

To be honest, I'm not an expert of Beam and so please feel free to correct
me if my understanding is wrong. Welcome any feedback.

Best,
Jincheng

Maximilian Michels  于2019年4月25日周四 上午12:14写道:

> Fully agree that this is an effort that goes beyond changing a type
> parameter but I think we have a chance here to cooperate between the two
> projects. I would be happy to help out where I can.
>
> I'm not sure at this point what exactly is feasible for reuse but I
> would imagine the Runner-related code to be useful as well for the
> interaction with the SDK Harness. There are some fundamental differences
> in the model, e.g. how windowing works, which might be challenging to
> work around.
>
> Thanks,
> Max
>
> On 24.04.19 12:03, jincheng sun wrote:
> >
> > Hi Kenn, I think you are 

Re: Jenkins nodes disconnected?

2019-07-24 Thread Łukasz Gajowy
Thanks Yifan!

Best,
Łukasz

wt., 23 lip 2019 o 18:50 Yifan Zou  napisał(a):

> That was a known issue, BEAM-7650
> . Basically, the disk
> was full. We should either fix this problem in the python precommit, or as
> Udi suggested, having a cron job to do the periodic disk space releases.
> I'll try to restore those broken agents.
>
> Thanks.
>
> Yifan
>
> On Tue, Jul 23, 2019 at 5:47 AM Łukasz Gajowy  wrote:
>
>> Hi,
>>
>> I noticed that 5 Jenkins nodes are disconnected[1]. This results in a
>> very long task queue and requires long waiting for a job to be completed.
>> I'm currently waiting 42 minutes for seed job to be started (and still
>> counting). Is anyone currently working on reconnecting the nodes? Why is
>> this happening? Can I help in any way?
>>
>> [1] https://builds.apache.org/computer/
>>
>