Re: real real-time beam

2019-12-03 Thread Kenneth Knowles
Jan - let's try to defrag the threads on your time sorting proposal. This
thread may have useful ideas but I want to focus on helping Aaron in this
thread. You can link to this thread from other threads or from a design
doc. Does this seem OK to you?

Aaron - do you have the information you need to implement your sink? My
impression is that you have quite a good grasp of the issues even before
you asked.

Kenn

On Wed, Nov 27, 2019 at 3:05 AM Jan Lukavský  wrote:

> > Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Thanks for this insight. I didn't know about the relation between trigger
> firing (event) time - which is always non-decreasing - and the resulting
> timestamp of output pane - which can be affected by timestamp combiner and
> decrease in cases you describe. What actually correlates with the pane
> index at all times is processing time of trigger firings with the pane
> index. Would you say, that if the "annotation that would guarantee ordering
> of panes" could be viewed as a time ordering annotation with an additional
> time domain (event time, processing time)? Could then these two be viewed
> as a single one with some distinguishing parameter?
>
> @RequiresTimeSortedInput(Domain.PANE_INDEX | Domain.EVENT_TIME)
>
> ?
>
> Event time should be probably made the default, because that is
> information that is accessible with every WindowedValue, while pane index
> is available only after GBK (or generally might be available after every
> keyed sequential operation, but is missing after source for instance).
>
> Jan
> On 11/27/19 1:32 AM, Kenneth Knowles wrote:
>
>
>
> On Tue, Nov 26, 2019 at 1:00 AM Jan Lukavský  wrote:
>
>> > I will not try to formalize this notion in this email. But I will note
>> that since it is universally assured, it would be zero cost and
>> significantly safer to formalize it and add an annotation noting it was
>> required. It has nothing to do with event time ordering, only trigger
>> firing ordering.
>>
>> I cannot agree with the last sentence (and I'm really not doing this on
>> purpose :-)). Panes generally arrive out of order, as mentioned several
>> times in the discussions linked from this thread. If we want to ensure
>> "trigger firing ordering", we can use the pane index, that is correct. But
>> - that is actually equivalent to sorting by event time, because pane index
>> order will be (nearly) the same as event time order. This is due to the
>> fact, that pane index and event time correlate (both are monotonic).
>>
> Trigger firings can have decreasing event timestamps w/ the minimum
> timestamp combiner*. I do think the issue at hand is best analyzed in terms
> of the explicit ordering on panes. And I do think we need to have an
> explicit guarantee or annotation strong enough to describe a
> correct-under-all-allowed runners sink. Today an antagonistic runner could
> probably break a lot of things.
>
> Kenn
>
> *In fact, they can decrease via the "maximum" timestamp combiner because
> actually timestamp combiners only apply to the elements that particular
> pane. This is weird, and maybe a design bug, but good to know about.
>
>
>> The pane index "only" solves the issue of preserving ordering even in
>> case where there are multiple firings within the same timestamp (regardless
>> of granularity). This was mentioned in the initial discussion about event
>> time ordering, and is part of the design doc - users should be allowed to
>> provide UDF for extracting time-correlated ordering field (which means
>> ability to choose a preferred, or authoritative, observer which assigns
>> unambiguous ordering to events). Example of this might include Kafka
>> offsets as well, or any queue index for that matter. This is not yet
>> implemented, but could (should) be in the future.
>>
>> The only case where these two things are (somewhat) different is the case
>> mentioned by @Steve - if the output is stateless ParDo, which will get
>> fused. But that is only because the processing is single-threaded per key,
>> and therefore the ordering is implied by timer ordering (and careful here,
>> many runners don't have this ordering 100% correct, as of now - this
>> problem luckily appears only when there are multiple timers per key).
>> Moreover, if there should be a failure, then the output might (would) get
>> back in time anyway. If there would be a shuffle operation after
>> GBK/Combine, then the ordering is no longer guaranteed and must be
>> explicitly taken care of.
>>
>> Last note, I must agree with @Rui that all these discussions are very
>> much related to retractions (precisely the ability to implement them).
>>
>> Jan
>> On 

Re: Beam Testing Tools FAQ

2019-12-03 Thread Kenneth Knowles
Just still reading through this. It is very helpful. Thanks!

On Tue, Dec 3, 2019 at 6:32 AM Reza Rokni  wrote:

> Thanx!
>
> On Wed, 27 Nov 2019, 02:31 Pablo Estrada,  wrote:
>
>> Very cool. Thanks Lukasz!
>>
>> On Tue, Nov 26, 2019 at 9:41 AM Alan Myrvold  wrote:
>>
>>> Nice, thanks!
>>>
>>> On Tue, Nov 26, 2019 at 8:04 AM Robert Bradshaw 
>>> wrote:
>>>
 Thanks!

 On Tue, Nov 26, 2019 at 7:43 AM Łukasz Gajowy 
 wrote:
 >
 > Hi all,
 >
 > our documentation (either confluence or the website docs) describes
 how to create various integration and performance tests - there already are
 core operations tests, nexmark and IO test documentation pages. However, we
 are lacking some general docs to describe what tools do we have and what is
 the purpose of them.
 >
 > Therefore, I took the liberty of creating the Beam Testing Tools FAQ
 on our confluence:
 >
 https://cwiki.apache.org/confluence/display/BEAM/Beam+Testing+Tools+FAQ
 >
 > Hopefully, this is helpful and sheds some more light on that
 important part of our infrastructure. If you feel that something is missing
 there, feel free to let me know or add it yourself. :)
 >
 > Thanks,
 > Łukasz

>>>


Version Beam Website Documentation

2019-12-03 Thread Ankur Goenka
Hi,

We are constantly adding features to Beam which makes each new Beam version
more feature rich and compelling.
This also means that the old Beam released don't have the new features and
might have different ways to do certain things.

(I might be wrong here) - Our Beam website only publish a single version
which is the latest version of documentation.
This means that the users working with older SDK don't really have an easy
way to lookup documentation for old versions of Beam.

Proposal: Shall we consider publishing versioned Beam website to help users
with old Beam version find the relevant information?

Thanks,
Ankur


Re: Jenkins jobs are not being displayed on GitHub

2019-12-03 Thread Yifan Zou
Glanced at the build workers. All jenkins executors are busy now. Lots of
jobs are in the waiting queue. And the recent completed jobs show a long
waiting time to get started, which is around 1h.
The Jenkins site also responses very slow.

On Tue, Dec 3, 2019 at 2:15 PM Kirill Kozlov 
wrote:

> Hello everyone!
>
> It looks like for the PRs created within the last 30 minutes status of
> Jenkins jobs is not being displayed.
> Seed job appears to be stuck [1]. #5293 is waiting for #5292 to finish,
> but #5293 shows that it is complete.
>
> [1] https://builds.apache.org/job/beam_SeedJob/
>
> --
> Kirill
>


Jenkins jobs are not being displayed on GitHub

2019-12-03 Thread Kirill Kozlov
Hello everyone!

It looks like for the PRs created within the last 30 minutes status of
Jenkins jobs is not being displayed.
Seed job appears to be stuck [1]. #5293 is waiting for #5292 to finish,
but #5293 shows that it is complete.

[1] https://builds.apache.org/job/beam_SeedJob/

--
Kirill


Re: Request for review of PR [Beam-8564]

2019-12-03 Thread Robert Bradshaw
Is there a way to wrap this up as an optional dependency with multiple
possible providers, if there's no good library satisfying all of the
conditions (in particular (1))?

On Tue, Dec 3, 2019 at 9:47 AM Luke Cwik  wrote:
>
> I was hoping that someone in the community would provide some alternatives 
> since there are quite a few implementations.
>
> On Tue, Dec 3, 2019 at 8:20 AM Amogh Tiwari  wrote:
>>
>> Hi Luke,
>>
>> I agree with your thoughts and observations. But, airlift:aircompressor is 
>> the only implementation of LZO in pure java. That straight away solves #5.
>> The other implementations that I found either have licensing issues (since 
>> LZO natively uses GNU GPL licence) or are implemented using .c, .h and jni 
>> (which again make them dependent on the OS). Please refer these: 
>> twitter/hadoop-lzo and shevek/lzo-java.
>> These were the main reasons why we based this on airlift:aircompressor.
>>
>> Thanks and Regards,
>> Amogh
>>
>>
>>
>> On Tue, Dec 3, 2019 at 2:59 AM Luke Cwik  wrote:
>>>
>>> I took a look. My biggest concern is finding a good LZO implementation. 
>>> Looking for one that preferably has:
>>> 1) Apache license
>>> 2) Has zero transitive dependencies
>>> 3) Is small
>>> 4) Is performant
>>> 5) Is native java or supports execution on the three main OSs (Windows, 
>>> Linux, Mac)
>>>
>>> In your PR you suggested using io.airlift:aircompressor:0.16 which doesn't 
>>> meet item #2 and its transitive dependency fails #3.
>>>
>>> On Mon, Dec 2, 2019 at 12:16 PM Amogh Tiwari  wrote:

 Hi,
 I have filed a PR for an extension that will enable Apache Beam to work 
 with LZO/LZOP compression. Please refer.
 I would love it if someone can take this up and review it.
 Please feel free to share your thoughts/suggestions.
 Regards,
 Amogh


Re: Request for review of PR [Beam-8564]

2019-12-03 Thread Luke Cwik
I was hoping that someone in the community would provide some alternatives
since there are quite a few implementations.

On Tue, Dec 3, 2019 at 8:20 AM Amogh Tiwari  wrote:

> Hi Luke,
>
> I agree with your thoughts and observations. But, airlift:aircompressor is
> the only implementation of LZO in pure java. That straight away solves #5.
> The other implementations that I found either have licensing issues (since
> LZO natively uses GNU GPL licence) or are implemented using .c, .h and jni
> (which again make them dependent on the OS). Please refer these:
> twitter/hadoop-lzo
> 
>  and
> shevek/lzo-java
> 
> .
> These were the main reasons why we based this on airlift:aircompressor.
>
> Thanks and Regards,
> Amogh
>
>
>
> On Tue, Dec 3, 2019 at 2:59 AM Luke Cwik  wrote:
>
>> I took a look. My biggest concern is finding a good LZO implementation.
>> Looking for one that preferably has:
>> 1) Apache license
>> 2) Has zero transitive dependencies
>> 3) Is small
>> 4) Is performant
>> 5) Is native java or supports execution on the three main OSs (Windows,
>> Linux, Mac)
>>
>> In your PR you suggested using io.airlift:aircompressor:0.16 which
>> doesn't meet item #2 and its transitive dependency fails #3.
>>
>> On Mon, Dec 2, 2019 at 12:16 PM Amogh Tiwari  wrote:
>>
>>> Hi,
>>> I have filed a PR for an extension that will enable Apache Beam to work
>>> with LZO/LZOP compression. Please refer
>>> .
>>> I would love it if someone can take this up and review it.
>>> Please feel free to share your thoughts/suggestions.
>>> Regards,
>>> Amogh
>>>
>>


Re: Per Element File Output Without writeDynamic

2019-12-03 Thread Eugene Kirpichov
Hi Christopher,

Thanks for clarifying. Then can you just preprocess the PCollection with a
custom FlatMapElements that converts each Document into one or more smaller
documents, small enough to be written into individual files? Then pair it
with a unique key and follow by FileIO.writeDynamic().by(the unique
key).withNumShards(1) to produce 1 file per document.

On Tue, Dec 3, 2019 at 7:55 AM Christopher Larsen <
christopher.lar...@quantiphi.com> wrote:

> Hi Eugene,
>
> Yes I think you've got it correct. In our use case we need to write each
> Document in the PCollection to a separate file as multiple Documents in a
> file will cause compilation errors and/or incorrect code to be generated by
> the Thrift compiler.
>
> Additionally there are some Documents that are so large that we would want
> them to be split.
>
> On Mon, Dec 2, 2019 at 9:45 PM Eugene Kirpichov  wrote:
>
>> Hi Christopher,
>>
>> So, you have a PCollection, and you're writing it to files.
>> FileIO.write/writeDynamic will write several Document's to each file -
>> however, in your use case some of the individual Document's are so large
>> that you want instead each of those large documents to be split into
>> several files.
>>
>> Before we continue, could you confirm whether my understanding is correct?
>>
>> Thanks.
>>
>> On Mon, Dec 2, 2019 at 7:08 PM Christopher Larsen <
>> christopher.lar...@quantiphi.com> wrote:
>>
>>> Ideally each element (document) will be written to a .thrift file so
>>> that it can be compiled without further manipulation.
>>>
>>> But in the case of an extremely large file I think it would be nice to
>>> split into smaller files. As far as splitting points go I think it could be
>>> split at a point in the list of definitions. Thoughts?
>>>
>>> On Mon, Dec 2, 2019 at 4:02 PM Reuven Lax  wrote:
>>>
 What do you mean by shard the output file? Can it be split at any byte
 location, or only at specific points?

 On Mon, Dec 2, 2019 at 2:05 PM Christopher Larsen <
 christopher.lar...@quantiphi.com> wrote:

> Hi Reuven,
>
> We would like to write each element to one file but still allow the
> runner to shard the output file which could yield more than one output 
> file
> per element.
>
> On Mon, Dec 2, 2019 at 11:55 AM Reuven Lax  wrote:
>
>> I'm not sure I completely understand the question. Are you saying
>> that you want each element to write to only one file, guaranteeing that 
>> two
>> elements are never written to the same file?
>>
>> On Mon, Dec 2, 2019 at 11:53 AM Christopher Larsen <
>> christopher.lar...@quantiphi.com> wrote:
>>
>>> Hi All,
>>>
>>> TL/DR: can you extend FileIO.sink to write one or more file per
>>> element instead of one or more elements per file?
>>>
>>> In working with Thrift files we have found that since a .thrift file
>>> needs to be compiled to generate code the order of the contents of the 
>>> file
>>> are important (ie, the namespace and includes elements need to come 
>>> before
>>> definitions are defined).
>>>
>>> The issue that we are facing is that by implementing
>>> FileIO.sink we cannot determine how many Document objects are
>>> written to a file since this is determined by the runner. This can 
>>> result
>>> in more than one Document being written to a file which will cause
>>> compilation errors.
>>>
>>> We know that this can be controlled by writeDynamic but since we
>>> believe the default behavior for the connector should be to output a
>>> Document to one or more files (depending on sharding) we were wondering 
>>> how
>>> to best accomplish this.
>>>
>>> Best,
>>> Chris
>>>
>>> *This message contains information that may be privileged or
>>> confidential and is the property of the Quantiphi Inc and/or its 
>>> affiliates**.
>>> It is intended only for the person to whom it is addressed. **If
>>> you are not the intended recipient, any review, dissemination,
>>> distribution, copying, storage or other use of all or any portion of 
>>> this
>>> message is strictly prohibited. If you received this message in error,
>>> please immediately notify the sender by reply e-mail and delete this
>>> message in its **entirety*
>>>
>>
> *This message contains information that may be privileged or
> confidential and is the property of the Quantiphi Inc and/or its 
> affiliates**.
> It is intended only for the person to whom it is addressed. **If you
> are not the intended recipient, any review, dissemination, distribution,
> copying, storage or other use of all or any portion of this message is
> strictly prohibited. If you received this message in error, please
> immediately notify the sender by reply e-mail and delete this message in
> its **entirety*
>
 --
>>> *Regards,*
>>>
>>> 

Re: Request for review of PR [Beam-8564]

2019-12-03 Thread Amogh Tiwari
Hi Luke,

I agree with your thoughts and observations. But, airlift:aircompressor is
the only implementation of LZO in pure java. That straight away solves #5.
The other implementations that I found either have licensing issues (since
LZO natively uses GNU GPL licence) or are implemented using .c, .h and jni
(which again make them dependent on the OS). Please refer these:
twitter/hadoop-lzo

and
shevek/lzo-java

.
These were the main reasons why we based this on airlift:aircompressor.

Thanks and Regards,
Amogh



On Tue, Dec 3, 2019 at 2:59 AM Luke Cwik  wrote:

> I took a look. My biggest concern is finding a good LZO implementation.
> Looking for one that preferably has:
> 1) Apache license
> 2) Has zero transitive dependencies
> 3) Is small
> 4) Is performant
> 5) Is native java or supports execution on the three main OSs (Windows,
> Linux, Mac)
>
> In your PR you suggested using io.airlift:aircompressor:0.16 which doesn't
> meet item #2 and its transitive dependency fails #3.
>
> On Mon, Dec 2, 2019 at 12:16 PM Amogh Tiwari  wrote:
>
>> Hi,
>> I have filed a PR for an extension that will enable Apache Beam to work
>> with LZO/LZOP compression. Please refer
>> .
>> I would love it if someone can take this up and review it.
>> Please feel free to share your thoughts/suggestions.
>> Regards,
>> Amogh
>>
>


Re: Per Element File Output Without writeDynamic

2019-12-03 Thread Christopher Larsen
Hi Eugene,

Yes I think you've got it correct. In our use case we need to write each
Document in the PCollection to a separate file as multiple Documents in a
file will cause compilation errors and/or incorrect code to be generated by
the Thrift compiler.

Additionally there are some Documents that are so large that we would want
them to be split.

On Mon, Dec 2, 2019 at 9:45 PM Eugene Kirpichov  wrote:

> Hi Christopher,
>
> So, you have a PCollection, and you're writing it to files.
> FileIO.write/writeDynamic will write several Document's to each file -
> however, in your use case some of the individual Document's are so large
> that you want instead each of those large documents to be split into
> several files.
>
> Before we continue, could you confirm whether my understanding is correct?
>
> Thanks.
>
> On Mon, Dec 2, 2019 at 7:08 PM Christopher Larsen <
> christopher.lar...@quantiphi.com> wrote:
>
>> Ideally each element (document) will be written to a .thrift file so that
>> it can be compiled without further manipulation.
>>
>> But in the case of an extremely large file I think it would be nice to
>> split into smaller files. As far as splitting points go I think it could be
>> split at a point in the list of definitions. Thoughts?
>>
>> On Mon, Dec 2, 2019 at 4:02 PM Reuven Lax  wrote:
>>
>>> What do you mean by shard the output file? Can it be split at any byte
>>> location, or only at specific points?
>>>
>>> On Mon, Dec 2, 2019 at 2:05 PM Christopher Larsen <
>>> christopher.lar...@quantiphi.com> wrote:
>>>
 Hi Reuven,

 We would like to write each element to one file but still allow the
 runner to shard the output file which could yield more than one output file
 per element.

 On Mon, Dec 2, 2019 at 11:55 AM Reuven Lax  wrote:

> I'm not sure I completely understand the question. Are you saying that
> you want each element to write to only one file, guaranteeing that two
> elements are never written to the same file?
>
> On Mon, Dec 2, 2019 at 11:53 AM Christopher Larsen <
> christopher.lar...@quantiphi.com> wrote:
>
>> Hi All,
>>
>> TL/DR: can you extend FileIO.sink to write one or more file per
>> element instead of one or more elements per file?
>>
>> In working with Thrift files we have found that since a .thrift file
>> needs to be compiled to generate code the order of the contents of the 
>> file
>> are important (ie, the namespace and includes elements need to come 
>> before
>> definitions are defined).
>>
>> The issue that we are facing is that by implementing
>> FileIO.sink we cannot determine how many Document objects are
>> written to a file since this is determined by the runner. This can result
>> in more than one Document being written to a file which will cause
>> compilation errors.
>>
>> We know that this can be controlled by writeDynamic but since we
>> believe the default behavior for the connector should be to output a
>> Document to one or more files (depending on sharding) we were wondering 
>> how
>> to best accomplish this.
>>
>> Best,
>> Chris
>>
>> *This message contains information that may be privileged or
>> confidential and is the property of the Quantiphi Inc and/or its 
>> affiliates**.
>> It is intended only for the person to whom it is addressed. **If you
>> are not the intended recipient, any review, dissemination, distribution,
>> copying, storage or other use of all or any portion of this message is
>> strictly prohibited. If you received this message in error, please
>> immediately notify the sender by reply e-mail and delete this message in
>> its **entirety*
>>
>
 *This message contains information that may be privileged or
 confidential and is the property of the Quantiphi Inc and/or its 
 affiliates**.
 It is intended only for the person to whom it is addressed. **If you
 are not the intended recipient, any review, dissemination, distribution,
 copying, storage or other use of all or any portion of this message is
 strictly prohibited. If you received this message in error, please
 immediately notify the sender by reply e-mail and delete this message in
 its **entirety*

>>> --
>> *Regards,*
>>
>> ___
>>
>> *Chris Larsen*
>>
>> Data Engineer | Quantiphi Inc. | US and India
>>
>> http://www.quantiphi.com | Analytics is in our DNA
>>
>> USA: +1 760 504 8477 <(760)%20504-8477>
>> 
>>
>>
>> *This message contains information that may be privileged or confidential
>> and is the property of the Quantiphi Inc and/or its affiliates**. It is
>> intended only for the person to whom it is addressed. **If you are not
>> the intended recipient, any review, dissemination, distribution, copying,
>> storage or other use of all or any 

Re: Beam Testing Tools FAQ

2019-12-03 Thread Reza Rokni
Thanx!

On Wed, 27 Nov 2019, 02:31 Pablo Estrada,  wrote:

> Very cool. Thanks Lukasz!
>
> On Tue, Nov 26, 2019 at 9:41 AM Alan Myrvold  wrote:
>
>> Nice, thanks!
>>
>> On Tue, Nov 26, 2019 at 8:04 AM Robert Bradshaw 
>> wrote:
>>
>>> Thanks!
>>>
>>> On Tue, Nov 26, 2019 at 7:43 AM Łukasz Gajowy 
>>> wrote:
>>> >
>>> > Hi all,
>>> >
>>> > our documentation (either confluence or the website docs) describes
>>> how to create various integration and performance tests - there already are
>>> core operations tests, nexmark and IO test documentation pages. However, we
>>> are lacking some general docs to describe what tools do we have and what is
>>> the purpose of them.
>>> >
>>> > Therefore, I took the liberty of creating the Beam Testing Tools FAQ
>>> on our confluence:
>>> >
>>> https://cwiki.apache.org/confluence/display/BEAM/Beam+Testing+Tools+FAQ
>>> >
>>> > Hopefully, this is helpful and sheds some more light on that important
>>> part of our infrastructure. If you feel that something is missing there,
>>> feel free to let me know or add it yourself. :)
>>> >
>>> > Thanks,
>>> > Łukasz
>>>
>>