Re: Why not adding all coders into ModelCoderRegistrar?

2018-10-03 Thread Shen Li
as the key seen by the runner (an Integer) is not a Byte array. Thanks, Shen On Fri, Sep 28, 2018 at 2:20 PM Shen Li wrote: > Thank you, Lukasz! > > Best, > Shen > > On Fri, Sep 28, 2018 at 2:11 PM Lukasz Cwik wrote: > >> Runners can never know about every coder th

Re: Why not adding all coders into ModelCoderRegistrar?

2018-09-28 Thread Shen Li
er side and on the SDK side > it should be KVCoder, > LengthPrefixCoder>. More details in [1]. > > 1: > http://doc/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA#heading=h.sh4d5klmtfis > > > > On Fri, Sep 28, 2018 at 11:02 AM Shen Li wrote: > >> Hi, >> >

Why not adding all coders into ModelCoderRegistrar?

2018-09-28 Thread Shen Li
Hi, I noticed that ModelCoderRegistrar only includes 9 out of ~40 coders. May I know the rationale behind this decision? https://github.com/apache/beam/blob/release-2.7.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java I think one

Checkpointing and Restoring BoundedSource

2018-05-16 Thread Shen Li
Hi, After recovering from a checkpoint, is it correct to use BoundedSource.BoundedReader#splitAtFraction(double) to resume a BoundedSource? My concern is that the doc says "the new range would contain *approximately* the given fraction of the amount of data in the current range." Does the word

Re: How can runners make use of sink parallelism?

2018-04-04 Thread Shen Li
thod is really a "flush" method, with @ProcessElement >>> perhaps buffering up elements to be written to e.g. the same file shard. It >>> is not this simple in practice but that gives the idea of how even with >>> unrestricted elementwise parallelism you don't get

Re: How can runners make use of sink parallelism?

2018-04-03 Thread Shen Li
t; specific issues with a particular connector? That could make this a very > productive discussion. > > Kenn > > On Mon, Apr 2, 2018 at 1:41 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi, >> >> It seems that there is no Sink base class. Some IO connectors (

Re: Source split consistency in distributed environment

2018-03-26 Thread Shen Li
Thank you! Shen On Mon, Mar 26, 2018 at 5:34 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > > > On Mon, Mar 26, 2018, 2:08 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Eugene, >> >> Thanks. Does it mean the application canno

Re: Source split consistency in distributed environment

2018-03-26 Thread Shen Li
tor instance can retrieve its own sub-source and proceed from there. Thanks, Shen On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > > > On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Lukasz, >> >>

Re: Source split consistency in distributed environment

2018-03-26 Thread Shen Li
a8f11 > 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/ > io/BoundedSource.java#L387 > 2: https://s.apache.org/splittable-do-fn > 3: https://conferences.oreilly.com/strata/strata-ca/ > public/schedule/detail/63696 > > > > On Mon, Mar 26, 2018 at 11:33 AM S

Source split consistency in distributed environment

2018-03-26 Thread Shen Li
Hi, Does the split API in Bounded/UnboundedSource guarantee to return the same result if invoked in different parallel instances in a distributed environment? For example, assume the original source can split into 3 sub-sources. Say the runner creates 3 parallel source operator instances

Re: Flatten input data streams with skewed watermark progress

2018-03-12 Thread Shen Li
This means that yes, the >> watermark is the minimum of all inputs. >> >> I don't see how a late tuple can become early. Can you explain? >> >> >> On Mon, Mar 12, 2018 at 2:07 PM Shen Li <cs.she...@gmail.com> wrote: >> >>> Hi Reuven, >

Re: Flatten input data streams with skewed watermark progress

2018-03-12 Thread Shen Li
the flatten (you can imagine a model in which Flatten was not > explicit, we just allowed multiple main inputs). This means that yes, the > watermark is the minimum of all inputs. > > I don't see how a late tuple can become early. Can you explain? > > > On Mon, Mar 12, 2018 at 2:0

Re: Flatten input data streams with skewed watermark progress

2018-03-12 Thread Shen Li
, I don't think it makes sense for the Flatten operator to cache > element. > > > On Mon, Mar 12, 2018 at 11:55 AM Shen Li <cs.she...@gmail.com> wrote: > >> If multiple inputs of Flatten proceed at different speeds, should the >> Flatten operator cache tuples b

Flatten input data streams with skewed watermark progress

2018-03-12 Thread Shen Li
If multiple inputs of Flatten proceed at different speeds, should the Flatten operator cache tuples before emitting output watermarks? This can prevent a late tuple from becoming early. But if the watermark gap (i.e., cache size) becomes too large among inputs, can the application tell Beam/runner

Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Thank you, Kenn! Shen On Thu, Mar 8, 2018 at 9:58 PM, Kenneth Knowles <k...@google.com> wrote: > > > On Thu, Mar 8, 2018 at 6:50 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Kenn, >> >> I just want to confirm that I understand it correctly. >>

Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
*side input* watermark, the runner/engine should still make it ready when it violates maximumLookback and *main input* watermark. Is that correct? Thanks, Shen On Thu, Mar 8, 2018 at 9:31 PM, Shen Li <cs.she...@gmail.com> wrote: > I see. Thank you Kenn and Lukasz. > > Best, >

Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
lds W from being > collected, when W expires on the side input you make it ready, you process > the elements with empty contents on the side input, then you GC the side > input. > > Kenn > > On Thu, Mar 8, 2018 at 4:32 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Luka

Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
: > I believe your missing over this point: "and also to not expire the side > input till the main input watermark advances beyond the garbage collection > hold of the side input." > > On Thu, Mar 8, 2018 at 3:33 PM, Shen Li <cs.she...@gmail.com> wrot

Re: Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Hi Lukasz, Thanks again. > the runner is required to hold back the main input till the side input is ready Yes, I understand these requirements. But what if the side input expires before it becomes ready? Shen

Main input element accesses expired side input windows

2018-03-08 Thread Shen Li
Hi, When a main input element tries to access an expired side input window (violating maximumLookback), should ParDo discard the element or treat it as an error? Besides, what should ParDo do in the following situation: 1. The side input window W is not expired but unready when the main input

Re: When should ParDo advance output watermarks?

2018-03-06 Thread Shen Li
Hi Kenn, Thank you! Shen On Tue, Mar 6, 2018 at 5:21 PM, Kenneth Knowles <k...@google.com> wrote: > On Tue, Mar 6, 2018 at 1:06 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi, >> >> Should ParDo advance output watermarks based on only main input or all

When should ParDo advance output watermarks?

2018-03-06 Thread Shen Li
Hi, Should ParDo advance output watermarks based on only main input or all inputs? Say if the watermark from a side input falls behind, should it block the output watermark of the ParDo. If there are pushed back elements, should the ParDo hold back its output watermarks until corresponding

Re: Can Window PTransform drop tuples that violate allowed lateness?

2018-02-01 Thread Shen Li
31, 2018 at 11:19 PM, Kenneth Knowles <k...@google.com> wrote: > On Mon, Jan 22, 2018 at 11:42 AM, Shen Li <cs.she...@gmail.com> wrote: > >> Hi Kenn, >> >> Thanks for the explanation. >> >> > So now elements are droppable if they belong to an expi

Re: Can Window PTransform drop tuples that violate allowed lateness?

2018-01-22 Thread Shen Li
". Any element that manages to make it to an aggregation before > the accumulator is expired is allowed to be included now and only after the > whole window expires we drop any further incoming elements for that window. > > Kenn > > On Mon, Jan 22, 2018 at 10:52 AM, Shen

Can Window PTransform drop tuples that violate allowed lateness?

2018-01-22 Thread Shen Li
Hi, The Window#withAllowedLateness(Duration) doc says "Any elements that are later than this as decided by the system-maintained watermark will be dropped". Can the runner safely discard a tuple that violates the allowed lateness in the Window operator? Or does it have to drop it in the

Re: Is upgrading to Kafka Client 0.10.2.0+ in the roadmap?

2017-10-30 Thread Shen Li
; wrote: > > > > > There has been some discussion about getting Kafka 0.10.x working on > > > BEAM-307[1]. > > > > > > As an immediate way to unblock yourself, modify your local copy of the > > > KafkaIO source to include setting the system property in

Is upgrading to Kafka Client 0.10.2.0+ in the roadmap?

2017-10-30 Thread Shen Li
Hi, To use KafkaIO in secure mode, I need to set -Djava.security.auth.login.config to point to a JAAS configuration file. It works fine for local execution. But how can I configure the "java.security.auth.login.config" property in the Beam app when the pipeline is submitted to a

Re: Adding back PipelineRunner#apply method

2017-08-15 Thread Shen Li
thank you! Shen On Tue, Aug 15, 2017 at 1:29 PM, Robert Bradshaw < rober...@google.com.invalid> wrote: > On Tue, Aug 15, 2017 at 10:21 AM, Shen Li <cs.she...@gmail.com> wrote: > > Hi Thomas, > > > > Does it mean future Pipeline implementations would allow a

Re: Adding back PipelineRunner#apply method

2017-08-15 Thread Shen Li
ote: > > > > > In general, no - but the implementation of PAssertionSite exemplifies > the > > > approach. I guess it could be useful to make this a general beam > feature > > > and remember it for all transforms. It would probably be best to > > i

Re: Adding back PipelineRunner#apply method

2017-08-15 Thread Shen Li
lid> wrote: > Hi Shen, > Responding just to one part of your message - "remember the line at which > the PTransform was added": take a look at > https://github.com/apache/beam/pull/2247 which does this for PAssert. > > On Mon, Aug 14, 2017 at 7:32 PM Shen Li <cs.sh

Rewind back tuple timestamp in DoFn

2017-06-14 Thread Shen Li
Hi, I saw the DoFn#getAllowedTimestampSkew has been marked as deprecated. What if a user does want to rewind back the timestamp without violating the watermark? Consider the case where there is a GroupByKey followed by a ParDo. The GroupByKey transform groups tuples into one-hour windows. Say,

Re: How to create a custom trigger?

2017-06-07 Thread Shen Li
r-specific concept not part of the > model (a runner doesn't have to actually reify such events) maybe you just > want to test your runner details directly. > > Kenn > > On Wed, Jun 7, 2017 at 10:22 AM, Shen Li <cs.she...@gmail.com> wrote: > > > Hi Lukasz, > >

Re: How to create a custom trigger?

2017-06-07 Thread Shen Li
y? It's not clear how this trigger would be useful to > that understanding. > > On Wed, Jun 7, 2017, 10:22 AM Shen Li <cs.she...@gmail.com> wrote: > > > Hi Lukasz, > > > > Thanks again for the suggestion. Is there any reason for not allowing > users > > create

Re: How to create a custom trigger?

2017-06-07 Thread Shen Li
; it instead. > > There are already several tests which validate TestStream compatible > runners to make sure their trigger evaluations are correct. > > On Wed, Jun 7, 2017 at 10:10 AM, Shen Li <cs.she...@gmail.com> wrote: > > > Hi Lukasz, > > > > Thanks for the su

Re: How to create a custom trigger?

2017-06-07 Thread Shen Li
AfterPane#elementCountAtLeast trigger as it seems to be the > closest to your description. It fires as soon as any data is available. > > Are you sure you don't want some kind of watermark based trigger with just > with a small interval size? > > > > > On Wed, Jun 7, 2017 at 8

Re: How to create a custom trigger?

2017-06-07 Thread Shen Li
to create custom trigger implementations. If you tell us > what you want your trigger to do, we may be able to suggest an alternative. > > On Wed, Jun 7, 2017 at 7:41 AM, Shen Li <cs.she...@gmail.com> wrote: > > > Hi, > > > > I created a custom trigger class

How to create a custom trigger?

2017-06-07 Thread Shen Li
Hi, I created a custom trigger class (XYZ) by extending the OnceTrigger. During execution, I got this error "Cannot translate trigger class XYZ to a runner-API proto." It seems that the Triggers.ProtoConverter class needs to declare a convertSpecific method for my trigger XYZ. How can I use my

Re: What is the use case and the expected behavior for a Flatten transform with no input?

2017-05-11 Thread Shen Li
Thu, May 11, 2017 at 11:24 AM, Shen Li <cs.she...@gmail.com> wrote: > > > Hi, > > > > The FlattenTest enforces that Flatten transform can handle empty input. > Is > > there any use case for this feature? > > > > https://github.com/apache/beam/

Re: How to create an unbounded PCollection?

2017-04-30 Thread Shen Li
Sorry about the duplication. Please ignore the above email. Shen On Sat, Apr 29, 2017 at 3:43 PM, Shen LI <cs.she...@gmail.com> wrote: > It seems that Create.of(Iterable) can only create a BoundedSource. Is there > a convenient way to read from an unbounded Iterable object wit

Re: Can application specify how watermarks should be generated?

2017-04-25 Thread Shen Li
amps" transform or a DoFn that uses "outputWithTimestamp"). It's > not safe to output elements at negative infinity when there is a watermark > that may drop elements, as is the case for unbounded sources. > > On Fri, Apr 21, 2017 at 8:44 AM, Shen Li <cs.she...@

Can application specify how watermarks should be generated?

2017-04-20 Thread Shen Li
Hi, Can application developers provide classes/methods to specify how to generate watermarks from sources, and how to aggregate watermarks from multiple input PCollections? Say, emit at most 1 watermark per second, or create watermarks that are 5 seconds older than the latest tuple's timestamp?

Re: HDFS and Google Cloud Storage

2017-04-11 Thread Shen Li
Thanks! Shen On Tue, Apr 11, 2017 at 11:10 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Yes, FileSystem "plugins" will use a scheme. Other connectors will use (as > it's already the case) DoFn/Source transforms. > > Regards > JB > > > On 04/11/

Re: HDFS and Google Cloud Storage

2017-04-11 Thread Shen Li
; > It means that HdfsIO will be deprecated (to be removed at some point). I'm > working on couple of PRs to leverage the new file IO layer. > > Regards > JB > > > On 04/11/2017 03:56 PM, Shen Li wrote: > >> Hi, >> >> Is there any reason why HDFS IO is implem

HDFS and Google Cloud Storage

2017-04-11 Thread Shen Li
Hi, Is there any reason why HDFS IO is implemented as a BoundedSource while Google Cloud Storage is implemented as a scheme ("gs://") for TextIO? To contribute a new IO connector, how can I determine whether it should be implemented as a source transform or as a scheme for the TextIO? Thanks,

Re: Expected behavior of PipelineResult#waitUntilFinish()

2017-03-29 Thread Shen Li
lid> wrote: > Hi! Please see discussion on https://issues.apache.org/ > jira/browse/BEAM-849 . > A pipeline terminates when all watermarks reach infinity - regardless of > boundedness. > > On Wed, Mar 29, 2017 at 4:54 PM Shen Li <cs.she...@gmail.com> wrote: > > > H

Re: How does SimpleDoFnRunner supports stateful ParDo?

2017-02-24 Thread Shen Li
Hi Kenn, Thanks a lot! Shen On Fri, Feb 24, 2017 at 6:37 PM, Kenneth Knowles <k...@google.com.invalid> wrote: > On Thu, Feb 23, 2017 at 9:02 PM, Shen Li <cs.she...@gmail.com> wrote: > > > Thanks a lot for explaining. As the SimpleDoFnRunner only takes one >

Re: How does SimpleDoFnRunner supports stateful ParDo?

2017-02-23 Thread Shen Li
; Hi Shen, > > The way that this is done is that the StepContext.stateInternals() is > specialized to be per-key by the runner, before you create the > SimpleDoFnRunner. Does this help? > > Kenn > > On Thu, Feb 23, 2017 at 3:03 PM, Shen Li <cs.she...@gmail.com&g

How does SimpleDoFnRunner supports stateful ParDo?

2017-02-23 Thread Shen Li
Hi, I am trying to understand how a runner can support the Stateful ParDo. Currently our runner relies on the SimpleDoFnRunner (Beam-0.5.0). But it cannot pass ParDoTest#testValueStateSideOutput (

How to fire the global window when using GroupAlsoByWindowViaWindowSetDoFn?

2017-01-31 Thread Shen Li
Hi, My runner is translating GroupByKey using GroupAlsoByWindowViaWindowSetDoFn. Say I have a BoundedSource with five tuples all placed into a global window. When the source is depleted, how should the runner notify the downstream GroupByKey(GroupAlsoByWindowViaWindowSetDoFn) that it should fire

Re: Doesn't PAssertTest.runExpectingAssertionFailure need to call waitUntilFinish?

2017-01-31 Thread Shen Li
licate writing tests, which we don't really want to > do... so it's a tradeoff that may be okay as-is. > > Dan > > [0] > https://github.com/apache/beam/blob/master/sdks/java/ > core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java#L64 > > > > On Tue, J

Default Timestamp and Watermark

2017-01-25 Thread Shen Li
Hi, When reading from a source with no timestamp specified on elements, what should be the default timestamp? I presume that it should be 0 as I saw PAssertTest trying to set timestamps to very small values with 0 allowed timestamp skew. Is that right? What about the default watermark policy?