Re: New Edit button on beam.apache.org pages

2018-10-24 Thread Kenneth Knowles
This is a genius way to involve everyone who lands on the site! My first PR
is about to open... :-)

Kenn

On Wed, Oct 24, 2018 at 8:47 PM Jean-Baptiste Onofré 
wrote:

> Sweet !!
>
> Thanks !
>
> Regards
> JB
>
> On 24/10/2018 23:24, Alan Myrvold wrote:
> > To make small documentation changes easier, there is now an Edit button
> > at the top right of the pages on https://beam.apache.org. This button
> > opens the source .md file on the master branch of the beam repository in
> > the github web editor. After making changes you can create a pull
> > request to ask to have it merged.
> >
> > Thanks to Scott for the suggestion to add this in [BEAM-4431]
> > 
> >
> > Let me know if you run into any issues.
> >
> > Alan
> >
> >
>


Re: New Edit button on beam.apache.org pages

2018-10-24 Thread Jean-Baptiste Onofré
Sweet !!

Thanks !

Regards
JB

On 24/10/2018 23:24, Alan Myrvold wrote:
> To make small documentation changes easier, there is now an Edit button
> at the top right of the pages on https://beam.apache.org. This button
> opens the source .md file on the master branch of the beam repository in
> the github web editor. After making changes you can create a pull
> request to ask to have it merged.
> 
> Thanks to Scott for the suggestion to add this in [BEAM-4431]
> 
> 
> Let me know if you run into any issues.
> 
> Alan
> 
> 


Re: KafkaIO - Deadletter output

2018-10-24 Thread Kenneth Knowles
Forgive me if this is naive or missing something, but here are my thoughts
on these alternatives:

(0) Timestamp has to be pulled out in the source to control the watermark.
Luke's point is imortant.

(1) If bad records get min_timestamp, and they occur infrequently enough,
then watermark will advance and they will all be dropped. That will not
allow output to a dead-letter queue.

(2) If you have always min_timestamp records, or if bad records are
frequent, the watermark will never advance. So windows/aggregations would
never be considered complete. Triggers could be used to get output anyhow,
but it would never be a final answer. I think it is not in the spirit of
Beam to work this way. Pragmatically, no state could ever be freed by a
runner.

In SQL there is an actual "dead letter" option when creating a table that
parses from a bytes source. If, for example, a JSON record cannot be parsed
to the expected schema - like maybe an avro record got in the stream, or
the JSON doesn't match the expected schema - it is output as-is to a
user-specified dead letter queue. I think this same level of support is
also required for records that cannot have timestamps extracted in an
unbounded source.

In an SDF I think the function has enough control to do it all in
"userland", so Cham is right on here.

Kenn

On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik  wrote:

> That depends on the users pipeline and how watermark advancement of the
> source may impact elements becoming droppably late if they are emitted with
> the minimum timestamp.
>
> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi  wrote:
>
>> I see.
>>
>> What I meant was to return min_timestamp for bad records in the timestamp
>> handler passed to KafkaIO itself, and correct timestamp for parsable
>> records. That should work too, right?
>>
>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>>
>>> Yes, that would be fine.
>>>
>>> The user could then use a ParDo which outputs to a DLQ for things it
>>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>>> else.
>>>
>>> 1:
>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>
>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>>>
 Thanks. So returning  min timestamp is OK, right (assuming application
 fine is with what it means)?

 On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:

> All records in Apache Beam have a timestamp. The default timestamp is
> the min timestamp defined here:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>
>
> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
> wrote:
>
>>
>>
>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik 
>> wrote:
>>
>>> You would have to return min timestamp for all records otherwise the
>>> watermark may have advanced and you would be outputting records that are
>>> droppably late.
>>>
>>
>> That would be fine I guess. What’s the timestamp for a record that
>> doesn’t have one?
>>
>>
>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>>> wrote:
>>>
 To be clear, returning min_timestamp for unparsable records shound
 not affect the watermark.

 On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
 wrote:

> How about returning min_timestamp? The would be dropped or
> redirected by the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
> API, is this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
> wrote:
>
>> In this case, the user is attempting to handle errors when
>> parsing the timestamp. The timestamp controls the watermark for the
>> UnboundedSource, how would they control the watermark in a 
>> downstream ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>> wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Ah nice. Yeah, if user can return full bytes instead of
 applying a function that would result in an exception,  this can be
 extracted by a ParDo down the line.

>>>
>>> KafkaIO does return bytes, and I think most sources should,
>>> unless there is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that
>>> makes to simpler to handle deadletter output? I think there was a 
>>> thread
>>> about it in the past.
>>>
>>>

 On Tue, Oct 23, 2018 at 11:14 

Re: [SQL] Investigation of missing/wrong session_end implementation in BeamSQL

2018-10-24 Thread Kenneth Knowles
This is some very cool digging, especially the forays into neighboring
Apache projects. We (and Flink) are clearly pushing at the edges of what
the original Calcite design foresaw. The naive insertion of Beam/Flink
style "group by window(s)" into SQL is showing a bit of wear.

Kenn

On Tue, Oct 23, 2018 at 3:46 PM Rui Wang  wrote:

> Hi community,
>
> In BeamSQL, SESSION window is supported in GROUP BY. Example query:
>
> "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
> + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS 
> `window_start`, "
> + " SESSION_END(f_timestamp, INTERVAL '5' MINUTE) AS `window_end` "
> + " FROM TABLE_A"
> + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
>
>
> However, I observed SESSION_END (window_end) always returns the same
> timestamp as what SESSION_START(window_start) returns, so BeamSQL misses
> the implementation to SESSION_END. Here is something about the
> investigation of root cause and proposed fix:
>
> *Why we are not missing tumble_end and hop_end?*
> Because when generating logical plan, Calcite replaces tumble_start and
> hop_start with a reference to GROUP BY's TUMBLE/HOP. The GROUP BY's
> TUMBLE/HOP is supposed to return a timestamp. Then Calcite replaces
> tumble_end and hop_end with a PLUS(timestamp reference, window_size as a
> constant). As tumble and hop has a fixed window size as constants in their
> function signatures, Calcite generates the PLUS in logical plan, which
> means for tumble and hop, we only need a timestamp (which represents
> window_start in our implementation) to generate both window_start and
> window_end in Projection.
>
> We are emitting window_start timestamp as the result of TUMBLE/HOP/SESSION
> functions:
> https://github.com/amaliujia/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java#L84
>
>
>
> *Why we are missing session_end?*Because Calcite does not know what's the
> window size of session window, so in logical plan, Calcite generates a
> reference to GROUP BY's SESSION for session_end, as the same as the
> reference generated for session_start. So in logical plan, session_start =
> session_end. Because in BeamSQL, we don't differentiate session with tumble
> and hop, so we returns window start as the result of SESSION function, and
> then in the final result, we see session_start = session_end.
>
> *Is this a Calcite bug?*
> Yes and No.
>
> Clearly Calcite shouldn't hide window_end by creating a wrong reference in
> logical plan. If Calcite does not know what's session_end, it should at
> least keep it. Ideally Calcite should keep window_end in logical plan and
> let us decide what it means: either a reference or a PLUS or something else.
>
> However, Calcite leaves space for us to add the window_end back in
> physical plan nodes. For example, we can add window_end back in
> BeamAggregationRel. We can probably change the reference of session_end to
> a reference to our window_end in BeamAggregationRel.
>
> *What is the fix?*
> In BeamAggregationRel, we should add a window_end right after window
> functions. We can emit window_end timestamp for the added field. And in
> Projection, we should change window_end from a PLUS (for tumble and hop)
> and a wrong reference (for session) to a right reference to the newly added
> window_end in Aggregation.
>
> Jira: https://issues.apache.org/jira/browse/BEAM-5843
>
>
> -Rui
>


Re: [DISCUSS] Beam public roadmap

2018-10-24 Thread Kenneth Knowles
OK. I have taken everyone's feedback into account. Preview at
http://apache-beam-website-pull-requests.storage.googleapis.com/6718/roadmap/index.html

Summary:

 - Rephrased the highlights to be more dignified
 - Filled out everything I could think of to get specific roadmaps started
 - Moved portability roadmap to the new roadmap
 - Moved portability design docs (and others) to
https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam
 - Moved "ongoing projects" to
https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam

Kenn

On Sat, Oct 20, 2018 at 9:08 AM Thomas Weise  wrote:

> +1
>
> I would suggest to also modify https://beam.apache.org/contribute/ to
> point to the new structure and remove duplicate content such as
> https://beam.apache.org/contribute/portability/
>
> Thanks
>
>
> On Sat, Oct 20, 2018 at 6:09 AM Jean-Baptiste Onofré 
> wrote:
>
>> +1
>>
>> it sounds good to me. I have some more long term ideas as well.
>>
>> Regards
>> JB
>>
>> On 20/10/2018 03:09, Ahmet Altay wrote:
>> > I looked at #6718, I think this is great as a starting point and not
>> > just a mock. I particularly like that:
>> > - It divides the roadmap along major component areas (SDKs, runners,
>> > portability). This is good because (a) it provides a complete top down
>> > picture and (b) allows groups of people working in these areas to build
>> > their own roadmaps. This division would empower people working in those
>> > components to build mini-roadmaps. This make sense to me because people
>> > with most context in those components would likely to already have some
>> > vision somewhere about the future of those components and they are
>> > already working towards realizing those. Now, they can share it with
>> > rest of the community and users in a structured way.
>> > - The other good bit is that, there is a index page that pulls major
>> > bits from each individual roadmap and provides a coherent list of where
>> > the project is going. It would be very easy for users to just look at
>> > this page and get a sense of the where the project is going.
>> >
>> > I believe this break down makes it easier for the most folks in the
>> > community to participate in the process of building and roadmap. In my
>> > opinion, we can merge Kenn's _mock_ and ask people to start filling in
>> > the areas they care about.
>> >
>> > Ahmet
>> >
>> > On Wed, Oct 17, 2018 at 7:23 AM, Kenneth Knowles > > > wrote:
>> >
>> > I mocked up a little something
>> > on https://github.com/apache/beam/pull/6718
>> > .
>> >
>> > Kenn
>> >
>> > On Sun, Oct 14, 2018 at 5:33 PM Thomas Weise > > > wrote:
>> >
>> > Indeed, our current in-progress subsection isn't visible enough.
>> > It is also too coarse grained. Perhaps we can replace it with a
>> > list of current and proposed initiatives?
>> >
>> > I could see the index live on the web site, but would prefer
>> > individual, per-initiative pages to live on the wiki. That way
>> > they are easy to maintain by respective contributors.
>> >
>> > Thanks
>> >
>> > On Fri, Oct 12, 2018 at 8:06 PM Kenneth Knowles <
>> k...@apache.org
>> > > wrote:
>> >
>> > I think we can easily steer clear of those concerns. It
>> > should not look like a company's roadmap. This is just a
>> > term that users search for and ask for. It might be an
>> > incremental improvement
>> > on https://beam.apache.org/contribute/#works-in-progress
>> >  to
>> > present it more for users, to just give them a picture of
>> > the trajectory. For example, Beam Python on Flink would
>> > probably be of considerable interest but it is buried at
>> > https://beam.apache.org/contribute/portability/#status
>> > .
>> >
>> > Kenn
>> >
>> > On Fri, Oct 12, 2018 at 6:49 PM Thomas Weise <
>> t...@apache.org
>> > > wrote:
>> >
>> > As I understand it the term "roadmap" is not favored. It
>> > may convey the impression of an outside entity that
>> > controls what is being worked on and when. At least in
>> > theory contributions are volunteer work and individuals
>> > decide what they take up. There are projects that have a
>> > "list of initiatives" or "improvement proposals" that
>> > are either in idea phase or ongoing. Those provide an
>> > idea what is on the radar and perhaps that is a
>> > sufficient for those looking for the overall direction?
>> >
>> >
>> > On Fri, 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
That depends on the users pipeline and how watermark advancement of the
source may impact elements becoming droppably late if they are emitted with
the minimum timestamp.

On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi  wrote:

> I see.
>
> What I meant was to return min_timestamp for bad records in the timestamp
> handler passed to KafkaIO itself, and correct timestamp for parsable
> records. That should work too, right?
>
> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>
>> Yes, that would be fine.
>>
>> The user could then use a ParDo which outputs to a DLQ for things it
>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>> else.
>>
>> 1:
>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>
>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>>
>>> Thanks. So returning  min timestamp is OK, right (assuming application
>>> fine is with what it means)?
>>>
>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>>>
 All records in Apache Beam have a timestamp. The default timestamp is
 the min timestamp defined here:
 https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48


 On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
 wrote:

>
>
> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>
>> You would have to return min timestamp for all records otherwise the
>> watermark may have advanced and you would be outputting records that are
>> droppably late.
>>
>
> That would be fine I guess. What’s the timestamp for a record that
> doesn’t have one?
>
>
>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>> wrote:
>>
>>> To be clear, returning min_timestamp for unparsable records shound
>>> not affect the watermark.
>>>
>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>>> wrote:
>>>
 How about returning min_timestamp? The would be dropped or
 redirected by the ParDo after that.
 Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
 is this pipeline defined under kafkaio package?

 On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
 wrote:

> In this case, the user is attempting to handle errors when parsing
> the timestamp. The timestamp controls the watermark for the
> UnboundedSource, how would they control the watermark in a downstream 
> ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
> wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying
>>> a function that would result in an exception,  this can be 
>>> extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should,
>> unless there is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes
>> to simpler to handle deadletter output? I think there was a thread 
>> about it
>> in the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>> jcgarc...@gmail.com> wrote:
>>>
 As Raghu said,

 Just apply a regular ParDo and return a PCollectionTuple afert
 that you can extract your Success Records (TupleTag) and your 
 DeadLetter
 records(TupleTag) and do whatever you want with them.


 Raghu Angadi  schrieb am Mi., 24. Okt.
 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize
> explicitly in a ParDo, which gives complete control on how to 
> handle record
> errors. This is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be
> convenient for users in many such scenarios. This is simpler than 
> each
> source supporting it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is
>> probably not something that can easily be supported. We might be 
>> able to
>> support similar features when we have Kafka on top of Splittable 
>> DoFn
>> though.
>>
> So feel free to create a feature 

Re: [PROPOSAL] ParquetIO support for Python SDK

2018-10-24 Thread Chamikara Jayalath
Thanks Heejong. Added some comments. +1 for summarizing the doc in the
email thread.

- Cham

On Wed, Oct 24, 2018 at 4:45 PM Ahmet Altay  wrote:

> Thank you Heejong. Could you also share a summary of the design document
> (major points/decisions) in the mailing list?
>
> On Wed, Oct 24, 2018 at 4:08 PM, Heejong Lee  wrote:
>
>> Hi,
>>
>> I'm working on BEAM-: Parquet IO for Python SDK.
>>
>> Issue: https://issues.apache.org/jira/browse/BEAM-
>> Design doc:
>> https://docs.google.com/document/d/1-FT6zmjYhYFWXL8aDM5mNeiUnZdKnnB021zTo4S-0Wg
>> WIP PR: https://github.com/apache/beam/pull/6763
>>
>> Any feedback is appreciated. Thanks!
>>
>>
>


Re: [PROPOSAL] ParquetIO support for Python SDK

2018-10-24 Thread Ahmet Altay
Thank you Heejong. Could you also share a summary of the design document
(major points/decisions) in the mailing list?

On Wed, Oct 24, 2018 at 4:08 PM, Heejong Lee  wrote:

> Hi,
>
> I'm working on BEAM-: Parquet IO for Python SDK.
>
> Issue: https://issues.apache.org/jira/browse/BEAM-
> Design doc: https://docs.google.com/document/d/1-
> FT6zmjYhYFWXL8aDM5mNeiUnZdKnnB021zTo4S-0Wg
> WIP PR: https://github.com/apache/beam/pull/6763
>
> Any feedback is appreciated. Thanks!
>
>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
I see.

What I meant was to return min_timestamp for bad records in the timestamp
handler passed to KafkaIO itself, and correct timestamp for parsable
records. That should work too, right?

On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:

> Yes, that would be fine.
>
> The user could then use a ParDo which outputs to a DLQ for things it can't
> parse the timestamp for and use outputWithTimestamp[1] for everything else.
>
> 1:
> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>
> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:
>
>> Thanks. So returning  min timestamp is OK, right (assuming application
>> fine is with what it means)?
>>
>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>>
>>> All records in Apache Beam have a timestamp. The default timestamp is
>>> the min timestamp defined here:
>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
>>> wrote:
>>>


 On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:

> You would have to return min timestamp for all records otherwise the
> watermark may have advanced and you would be outputting records that are
> droppably late.
>

 That would be fine I guess. What’s the timestamp for a record that
 doesn’t have one?


> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
> wrote:
>
>> To be clear, returning min_timestamp for unparsable records shound
>> not affect the watermark.
>>
>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>> wrote:
>>
>>> How about returning min_timestamp? The would be dropped or
>>> redirected by the ParDo after that.
>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>>> is this pipeline defined under kafkaio package?
>>>
>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>>> wrote:
>>>
 In this case, the user is attempting to handle errors when parsing
 the timestamp. The timestamp controls the watermark for the
 UnboundedSource, how would they control the watermark in a downstream 
 ParDo?

 On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
 wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying
>> a function that would result in an exception,  this can be extracted 
>> by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless
> there is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes
> to simpler to handle deadletter output? I think there was a thread 
> about it
> in the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>> jcgarc...@gmail.com> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>> that you can extract your Success Records (TupleTag) and your 
>>> DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi  schrieb am Mi., 24. Okt.
>>> 2018, 05:18:
>>>
 User can read serialized bytes from KafkaIO and deserialize
 explicitly in a ParDo, which gives complete control on how to 
 handle record
 errors. This is I would do if I need to in my pipeline.

 If there is a transform in Beam that does this, it could be
 convenient for users in many such scenarios. This is simpler than 
 each
 source supporting it explicitly.

 On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Given that KafkaIO uses UnboundeSource framework, this is
> probably not something that can easily be supported. We might be 
> able to
> support similar features when we have Kafka on top of Splittable 
> DoFn
> though.
>
 So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
> k...@google.com> wrote:
>
>> This is a great question. I've added the dev list to be sure
>> it gets noticed by whoever may know best.
>>

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
Yes, that would be fine.

The user could then use a ParDo which outputs to a DLQ for things it can't
parse the timestamp for and use outputWithTimestamp[1] for everything else.

1:
https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-

On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi  wrote:

> Thanks. So returning  min timestamp is OK, right (assuming application
> fine is with what it means)?
>
> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:
>
>> All records in Apache Beam have a timestamp. The default timestamp is the
>> min timestamp defined here:
>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>
>>
>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:
>>
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>>>
 You would have to return min timestamp for all records otherwise the
 watermark may have advanced and you would be outputting records that are
 droppably late.

>>>
>>> That would be fine I guess. What’s the timestamp for a record that
>>> doesn’t have one?
>>>
>>>
 On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
 wrote:

> To be clear, returning min_timestamp for unparsable records shound not
> affect the watermark.
>
> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
> wrote:
>
>> How about returning min_timestamp? The would be dropped or redirected
>> by the ParDo after that.
>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>> is this pipeline defined under kafkaio package?
>>
>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik 
>> wrote:
>>
>>> In this case, the user is attempting to handle errors when parsing
>>> the timestamp. The timestamp controls the watermark for the
>>> UnboundedSource, how would they control the watermark in a downstream 
>>> ParDo?
>>>
>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>>> wrote:
>>>
 On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by 
> a
> ParDo down the line.
>

 KafkaIO does return bytes, and I think most sources should, unless
 there is a good reason not to.
 Given that, do we think Beam should provide a tranform that makes
 to simpler to handle deadletter output? I think there was a thread 
 about it
 in the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
> jcgarc...@gmail.com> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert
>> that you can extract your Success Records (TupleTag) and your 
>> DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>> 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize
>>> explicitly in a ParDo, which gives complete control on how to 
>>> handle record
>>> errors. This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be
>>> convenient for users in many such scenarios. This is simpler than 
>>> each
>>> source supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Given that KafkaIO uses UnboundeSource framework, this is
 probably not something that can easily be supported. We might be 
 able to
 support similar features when we have Kafka on top of Splittable 
 DoFn
 though.

>>> So feel free to create a feature request JIRA for this.

 Thanks,
 Cham

 On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
 wrote:

> This is a great question. I've added the dev list to be sure
> it gets noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline
>> that uses a KafkaIO
>> connector for it's input? As
>> 

[PROPOSAL] ParquetIO support for Python SDK

2018-10-24 Thread Heejong Lee
Hi,

I'm working on BEAM-: Parquet IO for Python SDK.

Issue: https://issues.apache.org/jira/browse/BEAM-
Design doc:
https://docs.google.com/document/d/1-FT6zmjYhYFWXL8aDM5mNeiUnZdKnnB021zTo4S-0Wg
WIP PR: https://github.com/apache/beam/pull/6763

Any feedback is appreciated. Thanks!


Re: New Edit button on beam.apache.org pages

2018-10-24 Thread Ankur Goenka
Great addition to the website 

On Wed, Oct 24, 2018 at 2:51 PM Ruoyun Huang  wrote:

> Looks awesome!
>
> On Wed, Oct 24, 2018 at 2:24 PM Alan Myrvold  wrote:
>
>> To make small documentation changes easier, there is now an Edit button
>> at the top right of the pages on https://beam.apache.org. This button
>> opens the source .md file on the master branch of the beam repository in
>> the github web editor. After making changes you can create a pull request
>> to ask to have it merged.
>>
>> Thanks to Scott for the suggestion to add this in [BEAM-4431]
>> 
>>
>> Let me know if you run into any issues.
>>
>> Alan
>>
>>
>>
>
> --
> 
> Ruoyun  Huang
>
>


Re: New Edit button on beam.apache.org pages

2018-10-24 Thread Ruoyun Huang
Looks awesome!

On Wed, Oct 24, 2018 at 2:24 PM Alan Myrvold  wrote:

> To make small documentation changes easier, there is now an Edit button at
> the top right of the pages on https://beam.apache.org. This button opens
> the source .md file on the master branch of the beam repository in the
> github web editor. After making changes you can create a pull request to
> ask to have it merged.
>
> Thanks to Scott for the suggestion to add this in [BEAM-4431]
> 
>
> Let me know if you run into any issues.
>
> Alan
>
>
>

-- 

Ruoyun  Huang


[PROPOSAL] Bundle Finalization

2018-10-24 Thread Lukasz Cwik
I have been working on the protocol for splitting/checkpointing of bundles
for usage with SplittableDoFn but in the mean time wanted to share a
proposal for bundle finalization[1].

Bundle finalization is used to solve a problem where integration with
external systems which require acknowledgement (such as queue based
sources) should only be done when the output of a bundle is durably
persisted. The idea is that after a bundle is completed and the runner
durably persists the output a best effort finalization call is made back to
the same SDK harness instance. This allows the SDK harness to send any
"acknowledgements" to the external system. Any failures during finalization
require the external system to be able to restore anything which wasn't
acknowledged.

I also discuss why I don't believe we gain much by providing "guaranteed"
finalization. Please take a look at the doc I shared and feel free to
comment.

1: https://s.apache.org/beam-finalizing-bundles


Re: New Edit button on beam.apache.org pages

2018-10-24 Thread Charles Chen
This is great!  Thanks!

On Wed, Oct 24, 2018 at 2:26 PM Ahmet Altay  wrote:

> Really cool! Thank you!
>
> On Wed, Oct 24, 2018 at 2:24 PM, Alan Myrvold  wrote:
>
>> To make small documentation changes easier, there is now an Edit button
>> at the top right of the pages on https://beam.apache.org. This button
>> opens the source .md file on the master branch of the beam repository in
>> the github web editor. After making changes you can create a pull request
>> to ask to have it merged.
>>
>> Thanks to Scott for the suggestion to add this in [BEAM-4431]
>> 
>>
>> Let me know if you run into any issues.
>>
>> Alan
>>
>>
>>
>


Re: New Edit button on beam.apache.org pages

2018-10-24 Thread Ahmet Altay
Really cool! Thank you!

On Wed, Oct 24, 2018 at 2:24 PM, Alan Myrvold  wrote:

> To make small documentation changes easier, there is now an Edit button at
> the top right of the pages on https://beam.apache.org. This button opens
> the source .md file on the master branch of the beam repository in the
> github web editor. After making changes you can create a pull request to
> ask to have it merged.
>
> Thanks to Scott for the suggestion to add this in [BEAM-4431]
> 
>
> Let me know if you run into any issues.
>
> Alan
>
>
>


New Edit button on beam.apache.org pages

2018-10-24 Thread Alan Myrvold
To make small documentation changes easier, there is now an Edit button at
the top right of the pages on https://beam.apache.org. This button opens
the source .md file on the master branch of the beam repository in the
github web editor. After making changes you can create a pull request to
ask to have it merged.

Thanks to Scott for the suggestion to add this in [BEAM-4431]


Let me know if you run into any issues.

Alan


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
Thanks. So returning  min timestamp is OK, right (assuming application fine
is with what it means)?

On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik  wrote:

> All records in Apache Beam have a timestamp. The default timestamp is the
> min timestamp defined here:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>
>
> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:
>
>>
>>
>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>>
>>> You would have to return min timestamp for all records otherwise the
>>> watermark may have advanced and you would be outputting records that are
>>> droppably late.
>>>
>>
>> That would be fine I guess. What’s the timestamp for a record that
>> doesn’t have one?
>>
>>
>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi 
>>> wrote:
>>>
 To be clear, returning min_timestamp for unparsable records shound not
 affect the watermark.

 On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
 wrote:

> How about returning min_timestamp? The would be dropped or redirected
> by the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
> this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>
>> In this case, the user is attempting to handle errors when parsing
>> the timestamp. The timestamp controls the watermark for the
>> UnboundedSource, how would they control the watermark in a downstream 
>> ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
>> wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Ah nice. Yeah, if user can return full bytes instead of applying a
 function that would result in an exception,  this can be extracted by a
 ParDo down the line.

>>>
>>> KafkaIO does return bytes, and I think most sources should, unless
>>> there is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that makes to
>>> simpler to handle deadletter output? I think there was a thread about 
>>> it in
>>> the past.
>>>
>>>

 On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
 jcgarc...@gmail.com> wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert
> that you can extract your Success Records (TupleTag) and your 
> DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
> 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize
>> explicitly in a ParDo, which gives complete control on how to handle 
>> record
>> errors. This is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be
>> convenient for users in many such scenarios. This is simpler than 
>> each
>> source supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is
>>> probably not something that can easily be supported. We might be 
>>> able to
>>> support similar features when we have Kafka on top of Splittable 
>>> DoFn
>>> though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>> wrote:
>>>
 This is a great question. I've added the dev list to be sure it
 gets noticed by whoever may know best.

 Kenn

 On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
 tobias.kay...@ricardo.ch> wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that
> uses a KafkaIO
> connector for it's input? As
> `TimestampPolicyFactory.withTimestampFn()` takes
> only a SerializableFunction and not a ParDo, how would I be
> able to produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a
> KafkaIO input:
>
> pipeline.apply(
>   KafkaIO.read()
> .withBootstrapServers(bootstrap)
> .withTopics(topics)
> .withKeyDeserializer(StringDeserializer.class)
> 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
All records in Apache Beam have a timestamp. The default timestamp is the
min timestamp defined here:
https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48


On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi  wrote:

>
>
> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:
>
>> You would have to return min timestamp for all records otherwise the
>> watermark may have advanced and you would be outputting records that are
>> droppably late.
>>
>
> That would be fine I guess. What’s the timestamp for a record that doesn’t
> have one?
>
>
>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:
>>
>>> To be clear, returning min_timestamp for unparsable records shound not
>>> affect the watermark.
>>>
>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi 
>>> wrote:
>>>
 How about returning min_timestamp? The would be dropped or redirected
 by the ParDo after that.
 Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
 this pipeline defined under kafkaio package?

 On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:

> In this case, the user is attempting to handle errors when parsing the
> timestamp. The timestamp controls the watermark for the UnboundedSource,
> how would they control the watermark in a downstream ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
> wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>> function that would result in an exception,  this can be extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should, unless
>> there is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes to
>> simpler to handle deadletter output? I think there was a thread about it 
>> in
>> the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>> jcgarc...@gmail.com> wrote:
>>>
 As Raghu said,

 Just apply a regular ParDo and return a PCollectionTuple afert that
 you can extract your Success Records (TupleTag) and your DeadLetter
 records(TupleTag) and do whatever you want with them.


 Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
 05:18:

> User can read serialized bytes from KafkaIO and deserialize
> explicitly in a ParDo, which gives complete control on how to handle 
> record
> errors. This is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be
> convenient for users in many such scenarios. This is simpler than each
> source supporting it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is
>> probably not something that can easily be supported. We might be 
>> able to
>> support similar features when we have Kafka on top of Splittable DoFn
>> though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>> wrote:
>>
>>> This is a great question. I've added the dev list to be sure it
>>> gets noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that
 uses a KafkaIO
 connector for it's input? As
 `TimestampPolicyFactory.withTimestampFn()` takes
 only a SerializableFunction and not a ParDo, how would I be
 able to produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO
 input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(

 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))

 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
 "earliest"))
 

Jenkins build is back to normal : beam_SeedJob_Standalone #1807

2018-10-24 Thread Apache Jenkins Server
See 




Re: [DISCUSS] Publish vendored dependencies independently

2018-10-24 Thread Lukasz Cwik
On Wed, Oct 24, 2018 at 11:31 AM Kenneth Knowles  wrote:

> OK. I just opened https://github.com/apache/beam/pull/6809 to push Guava
> through. I made some comments there, and also I agree with Luke that full
> version string makes sense. For this purpose it seems easy and fine to do a
> search/replace to swap 20.0 for 20.1, and compatibility between them should
> not be a concern.
>
> I have minor suggestions and clarifications:
>
>  - Is there value to `beam` in the artifactId? I would leave it off unless
> there's a special need
>
It would only provide consistency with all our other artifactIds that we
publish but there isn't a special need that I'm aware of.


>  - Users should never use these and we make it extremely clear they are
> not supported for any reasons
>
 - Use 0.x versions indicating no intention of semantic versioning
>
I like this idea a lot.


>
> Bringing my comments and Luke's together, here's the proposal:
>
> groupId: org.apache.beam
> artifactId: vendored-guava-20_0
> namespace: org.apache.beam.vendored.guava.v20_0
> version: 0.1
>
> Alternatively it could be
>
> groupId: org.apache.beam-vendored
> artifactid: guava-20_0
> namespace: org.apache.beam.vendored.guava.v20_0
> version: 0.1
>
> I like the latter but I haven't gone through the process of establishing a
> new groupId.
>
> Based on
https://maven.apache.org/guides/mini/guide-naming-conventions.html, the
alternative groupId should be org.apache.beam.vendored and not
org.apache.beam-vendored
I slightly prefer org.apache.beam over org.apache.beam.vendored but not
enough to object to either choice as long as we maintain consistency for
all vendored dependencies we produce going forward.


> And for now we do not publish source jars. A couple of TODOs to get the
> build in good shape (classifiers, jars, interaction with plugins)
>
> Kenn
>
>
> On Wed, Oct 24, 2018 at 10:13 AM Lukasz Cwik  wrote:
>
>> It looks like we are agreeing to make each vendored dependency self
>> contained and have all their own internal dependencies packaged. For
>> example, gRPC and all its transitive dependencies would use
>> org.apache.beam.vendored.grpc.vYYY and Calcite and all its transitive
>> dependencies would use org.apache.beam.vendored.calcite.vZZZ.
>>
>> I also wanted to circle back on this question I had earlier that didn't
>> have any follow-up:
>> Currently we are relocating code depending on the version string. If the
>> major version is >= 1, we use only the major version within the package
>> string and rely on semantic versioning provided by the dependency to not
>> break people. If the major version is 0, we assume the dependency is
>> unstable and use the full version as part of the package string during
>> relocation.
>>
>> The downside of using the full version string for relocated packages:
>> 1) Users will end up with multiple copies of dependencies that differ
>> only by the minor or patch version increasing the size of their application.
>> 2) Bumping up the version of a dependency now requires the import
>> statement in all java files to be updated (not too difficult with some
>> sed/grep skills)
>>
>> The upside of using the full version string in the relocated package:
>> 1) We don't have to worry about whether a dependency maintains semantic
>> versioning which means our users won't have to worry about that either.
>> 2) This increases the odds that a user will load multiple slightly
>> different versions of the same dependency which is known to be incompatible
>> in certain situations (e.g. Netty 4.1.25 can't be on the classpath with
>> Netty 4.1.28 even though they are both shaded due to issues of how JNI with
>> tcnative works).
>>
>> My preference would be to use the full version string for import
>> statements (so org.apache.beam.vendor.grpc.v1_13_1...) since this would
>> allow multiple copies to not conflict with each other since in my opinion
>> it is a lot more difficult to help a user debug a dependency issue then to
>> use string replacement during dependency upgrades to fix import statements.
>> Also I would suggest we name the artifacts in Maven as follows:
>> groupId: org.apache.beam
>> artifactId: beam-vendor-grpc-v1_13_1
>> version: 1.0.0 (first version and subsequent versions such as 1.0.1 are
>> only for patch upgrades that fix any shading issues we may have had when
>> producing the vendored jar)
>>
>>
>> On Wed, Oct 24, 2018 at 6:01 AM Maximilian Michels 
>> wrote:
>>
>>> Would also keep it simple and optimize for the JAR size only if
>>> necessary.
>>>
>>> On 24.10.18 00:06, Kenneth Knowles wrote:
>>> > I think it makes sense for each vendored dependency to be
>>> self-contained
>>> > as much as possible. It should keep it fairly simple. Things that
>>> cross
>>> > their API surface cannot be hidden, of course. Jar size is not a
>>> concern
>>> > IMO.
>>> >
>>> > Kenn
>>> >
>>> > On Tue, Oct 23, 2018 at 9:05 AM Lukasz Cwik >> > > wrote:
>>> >
>>> 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik  wrote:

> You would have to return min timestamp for all records otherwise the
> watermark may have advanced and you would be outputting records that are
> droppably late.
>

That would be fine I guess. What’s the timestamp for a record that doesn’t
have one?


> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:
>
>> To be clear, returning min_timestamp for unparsable records shound not
>> affect the watermark.
>>
>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:
>>
>>> How about returning min_timestamp? The would be dropped or redirected by
>>> the ParDo after that.
>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>> this pipeline defined under kafkaio package?
>>>
>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>>>
 In this case, the user is attempting to handle errors when parsing the
 timestamp. The timestamp controls the watermark for the UnboundedSource,
 how would they control the watermark in a downstream ParDo?

 On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi 
 wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying a
>> function that would result in an exception,  this can be extracted by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless
> there is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes to
> simpler to handle deadletter output? I think there was a thread about it 
> in
> the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>> jcgarc...@gmail.com> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>>> 05:18:
>>>
 User can read serialized bytes from KafkaIO and deserialize
 explicitly in a ParDo, which gives complete control on how to handle 
 record
 errors. This is I would do if I need to in my pipeline.

 If there is a transform in Beam that does this, it could be
 convenient for users in many such scenarios. This is simpler than each
 source supporting it explicitly.

 On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Given that KafkaIO uses UnboundeSource framework, this is probably
> not something that can easily be supported. We might be able to 
> support
> similar features when we have Kafka on top of Splittable DoFn though.
>
 So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
> wrote:
>
>> This is a great question. I've added the dev list to be sure it
>> gets noticed by whoever may know best.
>>
>> Kenn
>>
>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>> tobias.kay...@ricardo.ch> wrote:
>>
>>>
>>> Hi,
>>>
>>> Is there a way to get a Deadletter Output from a pipeline that
>>> uses a KafkaIO
>>> connector for it's input? As
>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>> only a SerializableFunction and not a ParDo, how would I be able
>>> to produce a
>>> Deadletter output from it?
>>>
>>> I have the following pipeline defined that reads from a KafkaIO
>>> input:
>>>
>>> pipeline.apply(
>>>   KafkaIO.read()
>>> .withBootstrapServers(bootstrap)
>>> .withTopics(topics)
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>> .updateConsumerProperties(
>>>
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>> "earliest"))
>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "beam-consumers"))
>>>
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>> "true"))
>>> .withTimestampPolicyFactory(
>>> TimestampPolicyFactory.withTimestampFn(
>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>> .withReadCommitted()
>>> .commitOffsetsInFinalize())
>>>
>>>

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
You would have to return min timestamp for all records otherwise the
watermark may have advanced and you would be outputting records that are
droppably late.

On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi  wrote:

> To be clear, returning min_timestamp for unparsable records shound not
> affect the watermark.
>
> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:
>
>> How about returning min_timestamp? The would be dropped or redirected by
>> the ParDo after that.
>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>> this pipeline defined under kafkaio package?
>>
>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>>
>>> In this case, the user is attempting to handle errors when parsing the
>>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>>> how would they control the watermark in a downstream ParDo?
>>>
>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>>>
 On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by a
> ParDo down the line.
>

 KafkaIO does return bytes, and I think most sources should, unless
 there is a good reason not to.
 Given that, do we think Beam should provide a tranform that makes to
 simpler to handle deadletter output? I think there was a thread about it in
 the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
> jcgarc...@gmail.com> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert that
>> you can extract your Success Records (TupleTag) and your DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
>> 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize
>>> explicitly in a ParDo, which gives complete control on how to handle 
>>> record
>>> errors. This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be
>>> convenient for users in many such scenarios. This is simpler than each
>>> source supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
 Given that KafkaIO uses UnboundeSource framework, this is probably
 not something that can easily be supported. We might be able to support
 similar features when we have Kafka on top of Splittable DoFn though.

>>> So feel free to create a feature request JIRA for this.

 Thanks,
 Cham

 On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
 wrote:

> This is a great question. I've added the dev list to be sure it
> gets noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline that
>> uses a KafkaIO
>> connector for it's input? As
>> `TimestampPolicyFactory.withTimestampFn()` takes
>> only a SerializableFunction and not a ParDo, how would I be able
>> to produce a
>> Deadletter output from it?
>>
>> I have the following pipeline defined that reads from a KafkaIO
>> input:
>>
>> pipeline.apply(
>>   KafkaIO.read()
>> .withBootstrapServers(bootstrap)
>> .withTopics(topics)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ConfigurableDeserializer.class)
>> .updateConsumerProperties(
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>>
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>> "earliest"))
>> .updateConsumerProperties(ImmutableMap.of("group.id",
>> "beam-consumers"))
>>
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>> "true"))
>> .withTimestampPolicyFactory(
>> TimestampPolicyFactory.withTimestampFn(
>> new MessageTimestampExtractor(inputMessagesConfig)))
>> .withReadCommitted()
>> .commitOffsetsInFinalize())
>>
>>
>> and I like to get deadletter outputs when my timestamp extraction
>> fails.
>>
>> Best,
>> Tobi
>>
>>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
To be clear, returning min_timestamp for unparsable records shound not
affect the watermark.

On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi  wrote:

> How about returning min_timestamp? The would be dropped or redirected by
> the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
> this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:
>
>> In this case, the user is attempting to handle errors when parsing the
>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>> how would they control the watermark in a downstream ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
>>> wrote:
>>>
 Ah nice. Yeah, if user can return full bytes instead of applying a
 function that would result in an exception,  this can be extracted by a
 ParDo down the line.

>>>
>>> KafkaIO does return bytes, and I think most sources should, unless there
>>> is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that makes to
>>> simpler to handle deadletter output? I think there was a thread about it in
>>> the past.
>>>
>>>

 On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
 jcgarc...@gmail.com> wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert that
> you can extract your Success Records (TupleTag) and your DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi  schrieb am Mi., 24. Okt. 2018,
> 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize
>> explicitly in a ParDo, which gives complete control on how to handle 
>> record
>> errors. This is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be
>> convenient for users in many such scenarios. This is simpler than each
>> source supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>> not something that can easily be supported. We might be able to support
>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>>> wrote:
>>>
 This is a great question. I've added the dev list to be sure it
 gets noticed by whoever may know best.

 Kenn

 On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
 tobias.kay...@ricardo.ch> wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that
> uses a KafkaIO
> connector for it's input? As
> `TimestampPolicyFactory.withTimestampFn()` takes
> only a SerializableFunction and not a ParDo, how would I be able
> to produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a KafkaIO
> input:
>
> pipeline.apply(
>   KafkaIO.read()
> .withBootstrapServers(bootstrap)
> .withTopics(topics)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ConfigurableDeserializer.class)
> .updateConsumerProperties(
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> inputMessagesConfig))
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> "earliest"))
> .updateConsumerProperties(ImmutableMap.of("group.id",
> "beam-consumers"))
>
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
> "true"))
> .withTimestampPolicyFactory(
> TimestampPolicyFactory.withTimestampFn(
> new MessageTimestampExtractor(inputMessagesConfig)))
> .withReadCommitted()
> .commitOffsetsInFinalize())
>
>
> and I like to get deadletter outputs when my timestamp extraction
> fails.
>
> Best,
> Tobi
>
>


Re: KafkaIO - Deadletter output

2018-10-24 Thread Juan Carlos Garcia
As Raghu said,

Just apply a regular ParDo and return a PCollectionTuple afert that you can
extract your Success Records (TupleTag) and your DeadLetter
records(TupleTag) and do whatever you want with them.


Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize explicitly in
> a ParDo, which gives complete control on how to handle record errors. This
> is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be convenient for
> users in many such scenarios. This is simpler than each source supporting
> it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>> something that can easily be supported. We might be able to support similar
>> features when we have Kafka on top of Splittable DoFn though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>>
>>> This is a great question. I've added the dev list to be sure it gets
>>> noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias 
>>> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that uses a
 KafkaIO
 connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
 takes
 only a SerializableFunction and not a ParDo, how would I be able to
 produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(
 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))
 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
 "earliest"))
 .updateConsumerProperties(ImmutableMap.of("group.id",
 "beam-consumers"))
 .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
 "true"))
 .withTimestampPolicyFactory(
 TimestampPolicyFactory.withTimestampFn(
 new MessageTimestampExtractor(inputMessagesConfig)))
 .withReadCommitted()
 .commitOffsetsInFinalize())


 and I like to get deadletter outputs when my timestamp extraction fails.

 Best,
 Tobi




Re: [DISCUSS] Publish vendored dependencies independently

2018-10-24 Thread Kenneth Knowles
OK. I just opened https://github.com/apache/beam/pull/6809 to push Guava
through. I made some comments there, and also I agree with Luke that full
version string makes sense. For this purpose it seems easy and fine to do a
search/replace to swap 20.0 for 20.1, and compatibility between them should
not be a concern.

I have minor suggestions and clarifications:

 - Is there value to `beam` in the artifactId? I would leave it off unless
there's a special need
 - Users should never use these and we make it extremely clear they are not
supported for any reasons
 - Use 0.x versions indicating no intention of semantic versioning

Bringing my comments and Luke's together, here's the proposal:

groupId: org.apache.beam
artifactId: vendored-guava-20_0
namespace: org.apache.beam.vendored.guava.v20_0
version: 0.1

Alternatively it could be

groupId: org.apache.beam-vendored
artifactid: guava-20_0
namespace: org.apache.beam.vendored.guava.v20_0
version: 0.1

I like the latter but I haven't gone through the process of establishing a
new groupId.

And for now we do not publish source jars. A couple of TODOs to get the
build in good shape (classifiers, jars, interaction with plugins)

Kenn


On Wed, Oct 24, 2018 at 10:13 AM Lukasz Cwik  wrote:

> It looks like we are agreeing to make each vendored dependency self
> contained and have all their own internal dependencies packaged. For
> example, gRPC and all its transitive dependencies would use
> org.apache.beam.vendored.grpc.vYYY and Calcite and all its transitive
> dependencies would use org.apache.beam.vendored.calcite.vZZZ.
>
> I also wanted to circle back on this question I had earlier that didn't
> have any follow-up:
> Currently we are relocating code depending on the version string. If the
> major version is >= 1, we use only the major version within the package
> string and rely on semantic versioning provided by the dependency to not
> break people. If the major version is 0, we assume the dependency is
> unstable and use the full version as part of the package string during
> relocation.
>
> The downside of using the full version string for relocated packages:
> 1) Users will end up with multiple copies of dependencies that differ only
> by the minor or patch version increasing the size of their application.
> 2) Bumping up the version of a dependency now requires the import
> statement in all java files to be updated (not too difficult with some
> sed/grep skills)
>
> The upside of using the full version string in the relocated package:
> 1) We don't have to worry about whether a dependency maintains semantic
> versioning which means our users won't have to worry about that either.
> 2) This increases the odds that a user will load multiple slightly
> different versions of the same dependency which is known to be incompatible
> in certain situations (e.g. Netty 4.1.25 can't be on the classpath with
> Netty 4.1.28 even though they are both shaded due to issues of how JNI with
> tcnative works).
>
> My preference would be to use the full version string for import
> statements (so org.apache.beam.vendor.grpc.v1_13_1...) since this would
> allow multiple copies to not conflict with each other since in my opinion
> it is a lot more difficult to help a user debug a dependency issue then to
> use string replacement during dependency upgrades to fix import statements.
> Also I would suggest we name the artifacts in Maven as follows:
> groupId: org.apache.beam
> artifactId: beam-vendor-grpc-v1_13_1
> version: 1.0.0 (first version and subsequent versions such as 1.0.1 are
> only for patch upgrades that fix any shading issues we may have had when
> producing the vendored jar)
>
>
> On Wed, Oct 24, 2018 at 6:01 AM Maximilian Michels  wrote:
>
>> Would also keep it simple and optimize for the JAR size only if necessary.
>>
>> On 24.10.18 00:06, Kenneth Knowles wrote:
>> > I think it makes sense for each vendored dependency to be
>> self-contained
>> > as much as possible. It should keep it fairly simple. Things that cross
>> > their API surface cannot be hidden, of course. Jar size is not a
>> concern
>> > IMO.
>> >
>> > Kenn
>> >
>> > On Tue, Oct 23, 2018 at 9:05 AM Lukasz Cwik > > > wrote:
>> >
>> > How should we handle the transitive dependencies of the things we
>> > want to vendor?
>> >
>> > For example we use gRPC which depends on Guava 20 and we also use
>> > Calcite which depends on Guava 19.
>> >
>> > Should the vendored gRPC/Calcite/... be self-contained so it
>> > contains all its dependencies, hence vendored gRPC would contain
>> > Guava 20 and vendored Calcite would contain Guava 19 (both under
>> > different namespaces)?
>> > This leads to larger jars but less vendored dependencies to
>> maintain.
>> >
>> > Or should we produce a vendored library for those that we want to
>> > share, e.g. Guava 20 that could be reused across multiple vendored
>> > 

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
How about returning min_timestamp? The would be dropped or redirected by
the ParDo after that.
Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is this
pipeline defined under kafkaio package?

On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik  wrote:

> In this case, the user is attempting to handle errors when parsing the
> timestamp. The timestamp controls the watermark for the UnboundedSource,
> how would they control the watermark in a downstream ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
>> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>> function that would result in an exception,  this can be extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should, unless there
>> is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes to
>> simpler to handle deadletter output? I think there was a thread about it in
>> the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
>>> wrote:
>>>
 As Raghu said,

 Just apply a regular ParDo and return a PCollectionTuple afert that you
 can extract your Success Records (TupleTag) and your DeadLetter
 records(TupleTag) and do whatever you want with them.


 Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize explicitly
> in a ParDo, which gives complete control on how to handle record errors.
> This is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be convenient
> for users in many such scenarios. This is simpler than each source
> supporting it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is probably
>> not something that can easily be supported. We might be able to support
>> similar features when we have Kafka on top of Splittable DoFn though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
>> wrote:
>>
>>> This is a great question. I've added the dev list to be sure it gets
>>> noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>> tobias.kay...@ricardo.ch> wrote:
>>>

 Hi,

 Is there a way to get a Deadletter Output from a pipeline that uses
 a KafkaIO
 connector for it's input? As
 `TimestampPolicyFactory.withTimestampFn()` takes
 only a SerializableFunction and not a ParDo, how would I be able to
 produce a
 Deadletter output from it?

 I have the following pipeline defined that reads from a KafkaIO
 input:

 pipeline.apply(
   KafkaIO.read()
 .withBootstrapServers(bootstrap)
 .withTopics(topics)
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ConfigurableDeserializer.class)
 .updateConsumerProperties(
 ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
 inputMessagesConfig))
 .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
 "earliest"))
 .updateConsumerProperties(ImmutableMap.of("group.id",
 "beam-consumers"))
 .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
 "true"))
 .withTimestampPolicyFactory(
 TimestampPolicyFactory.withTimestampFn(
 new MessageTimestampExtractor(inputMessagesConfig)))
 .withReadCommitted()
 .commitOffsetsInFinalize())


 and I like to get deadletter outputs when my timestamp extraction
 fails.

 Best,
 Tobi




Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
In this case, the user is attempting to handle errors when parsing the
timestamp. The timestamp controls the watermark for the UnboundedSource,
how would they control the watermark in a downstream ParDo?

On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi  wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying a
>> function that would result in an exception,  this can be extracted by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless there
> is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes to
> simpler to handle deadletter output? I think there was a thread about it in
> the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
>> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>>> can extract your Success Records (TupleTag) and your DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>>>
 User can read serialized bytes from KafkaIO and deserialize explicitly
 in a ParDo, which gives complete control on how to handle record errors.
 This is I would do if I need to in my pipeline.

 If there is a transform in Beam that does this, it could be convenient
 for users in many such scenarios. This is simpler than each source
 supporting it explicitly.

 On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Given that KafkaIO uses UnboundeSource framework, this is probably not
> something that can easily be supported. We might be able to support 
> similar
> features when we have Kafka on top of Splittable DoFn though.
>
 So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles 
> wrote:
>
>> This is a great question. I've added the dev list to be sure it gets
>> noticed by whoever may know best.
>>
>> Kenn
>>
>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>> tobias.kay...@ricardo.ch> wrote:
>>
>>>
>>> Hi,
>>>
>>> Is there a way to get a Deadletter Output from a pipeline that uses
>>> a KafkaIO
>>> connector for it's input? As
>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>> only a SerializableFunction and not a ParDo, how would I be able to
>>> produce a
>>> Deadletter output from it?
>>>
>>> I have the following pipeline defined that reads from a KafkaIO
>>> input:
>>>
>>> pipeline.apply(
>>>   KafkaIO.read()
>>> .withBootstrapServers(bootstrap)
>>> .withTopics(topics)
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>> .updateConsumerProperties(
>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>> "earliest"))
>>> .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "beam-consumers"))
>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>> "true"))
>>> .withTimestampPolicyFactory(
>>> TimestampPolicyFactory.withTimestampFn(
>>> new MessageTimestampExtractor(inputMessagesConfig)))
>>> .withReadCommitted()
>>> .commitOffsetsInFinalize())
>>>
>>>
>>> and I like to get deadletter outputs when my timestamp extraction
>>> fails.
>>>
>>> Best,
>>> Tobi
>>>
>>>


Re: [DISCUSS] Publish vendored dependencies independently

2018-10-24 Thread Lukasz Cwik
It looks like we are agreeing to make each vendored dependency self
contained and have all their own internal dependencies packaged. For
example, gRPC and all its transitive dependencies would use
org.apache.beam.vendored.grpc.vYYY and Calcite and all its transitive
dependencies would use org.apache.beam.vendored.calcite.vZZZ.

I also wanted to circle back on this question I had earlier that didn't
have any follow-up:
Currently we are relocating code depending on the version string. If the
major version is >= 1, we use only the major version within the package
string and rely on semantic versioning provided by the dependency to not
break people. If the major version is 0, we assume the dependency is
unstable and use the full version as part of the package string during
relocation.

The downside of using the full version string for relocated packages:
1) Users will end up with multiple copies of dependencies that differ only
by the minor or patch version increasing the size of their application.
2) Bumping up the version of a dependency now requires the import statement
in all java files to be updated (not too difficult with some sed/grep
skills)

The upside of using the full version string in the relocated package:
1) We don't have to worry about whether a dependency maintains semantic
versioning which means our users won't have to worry about that either.
2) This increases the odds that a user will load multiple slightly
different versions of the same dependency which is known to be incompatible
in certain situations (e.g. Netty 4.1.25 can't be on the classpath with
Netty 4.1.28 even though they are both shaded due to issues of how JNI with
tcnative works).

My preference would be to use the full version string for import statements
(so org.apache.beam.vendor.grpc.v1_13_1...) since this would allow multiple
copies to not conflict with each other since in my opinion it is a lot more
difficult to help a user debug a dependency issue then to use string
replacement during dependency upgrades to fix import statements. Also I
would suggest we name the artifacts in Maven as follows:
groupId: org.apache.beam
artifactId: beam-vendor-grpc-v1_13_1
version: 1.0.0 (first version and subsequent versions such as 1.0.1 are
only for patch upgrades that fix any shading issues we may have had when
producing the vendored jar)


On Wed, Oct 24, 2018 at 6:01 AM Maximilian Michels  wrote:

> Would also keep it simple and optimize for the JAR size only if necessary.
>
> On 24.10.18 00:06, Kenneth Knowles wrote:
> > I think it makes sense for each vendored dependency to be self-contained
> > as much as possible. It should keep it fairly simple. Things that cross
> > their API surface cannot be hidden, of course. Jar size is not a concern
> > IMO.
> >
> > Kenn
> >
> > On Tue, Oct 23, 2018 at 9:05 AM Lukasz Cwik  > > wrote:
> >
> > How should we handle the transitive dependencies of the things we
> > want to vendor?
> >
> > For example we use gRPC which depends on Guava 20 and we also use
> > Calcite which depends on Guava 19.
> >
> > Should the vendored gRPC/Calcite/... be self-contained so it
> > contains all its dependencies, hence vendored gRPC would contain
> > Guava 20 and vendored Calcite would contain Guava 19 (both under
> > different namespaces)?
> > This leads to larger jars but less vendored dependencies to maintain.
> >
> > Or should we produce a vendored library for those that we want to
> > share, e.g. Guava 20 that could be reused across multiple vendored
> > libraries?
> > Makes the vendoring process slightly more complicated, more
> > dependencies to maintain, smaller jars.
> >
> > Or should we produce a vendored library for each dependency?
> > Lots of vendoring needed, likely tooling required to be built to
> > maintain this.
> >
> >
> >
> >
> > On Tue, Oct 23, 2018 at 8:46 AM Kenneth Knowles  > > wrote:
> >
> > I actually created the subtasks by finding things shaded by at
> > least one module. I think each one should definitely have an on
> > list discussion that clarifies the target artifact, namespace,
> > version, possible complications, etc.
> >
> > My impression is that many many modules shade only Guava. So for
> > build time and simplification that is a big win.
> >
> > Kenn
> >
> > On Tue, Oct 23, 2018, 08:16 Thomas Weise  > > wrote:
> >
> > +1 for separate artifacts
> >
> > I would request that we explicitly discuss and agree which
> > dependencies we vendor though.
> >
> > Not everything listed in the JIRA subtasks is currently
> > relocated.
> >
> > Thomas
> >
> >
> > On Tue, Oct 23, 2018 at 8:04 AM David Morávek
> > mailto:david.mora...@gmail.com>>
> > wrote:
> >
> > 

Re: Follow up ideas, to simplify creating MonitoringInfos.

2018-10-24 Thread Alex Amato
Okay. That makes sense. Using runtime validation and protos is what I was
thinking as well.
I'll include you as a reviewer in my PRs.

As for the choice of a builder/constructor/factory. If we go with factory
methods/constructor then we will need a method for each metric type
(intCounter, latestInt64, ...). Plus, then I don't think we can have any
constructor parameters for labels, we will just need to accept a dictionary
for label keys+values like you say. This is because we cannot offer a
method for each URN and its combination of labels, and we should avoid such
static detection, as you say.

The builder however, allows us to add a method for setting each label.
Since there are a small number of labels I think this should be fine. A
specific metric URN will have a specific set of labels which we can
validate being set. Any variant of this should use a different label (or a
new version in the label). Thus, the builder can give an advantage, making
it easier to set label values without needing to provide the correct key
for the label, when its set. You just need to call the correct method.

Perhaps it might be best to leave this open to each SDK based on its
language style (Builder, Factory, etc.) , but make sure we use the
protos+runtime validation.

On Wed, Oct 24, 2018 at 7:02 AM Robert Bradshaw  wrote:

> Thanks for bringing this to the list; it's a good question.
>
> I think the difficulty comes from trying to statically define a lists
> of possibilities that should instead be runtime values. E.g. we
> currently we're up to about a dozen distinct types, and having a
> setter for each is both verbose and not very extensible (especially
> replicated cross language). I'm not sure the set of possible labels is
> fixed either. Generally in the FnAPI we've been using shared constants
> for this kind of thing instead. (I was wary about the protos for the
> same reasons; it would be good to avoid leaking this through to each
> of the various SDKs.) The amount of static typing/validation one gets
> depends on how much logic you build into each of these methods (e.g.
> does it (almost?) always "metric" which is assumed to already be
> encoded correctly, or a specific type that has tradeoffs with the
> amount you can do generically (e.g. we have code that first loops over
> counters, then over distributions, then over gauges, and I don't think
> we want continue that pattern all M places for all N types)).
>
> I would probably lean towards mostly doing runtime validation here.
> Specifically, one would have a data file that defines, for each known
> URN, its type along with the set of permitted/expected/required
> labels. On construction a MonitoringInfo data point, one would provide
> a URN + value + labelMap, and optionally a type. On construction, the
> constructor (method, factory, whatever) would look up the URN to
> determine the type (or throw an error if it's both not known and not
> provided), which could be then used to fetch an encoder of sorts (that
> can go from value <-> proto encoding, possibly with some validation).
> If labels and/or annotations are provided and the URN is known, we can
> validate these as well.
>
> As for proto/enums vs. yaml, the former is nice because code
> generation comes for free, but has turned out to be much more verbose
> (both the definition and the use) and I'm still on the fence whether
> it's a net win.
>
> (Unfortunately AutoValue won't help much here, as the ultimate goal is
> to wrap a very nested proto OneOf, ideally with some validation.
> (Also, in Python, generally, having optional, named arguments makes
> this kind of builder pattern less needed.))
>
> On Wed, Oct 24, 2018 at 4:12 AM Kenneth Knowles  wrote:
> >
> > FWIW AutoValue will build most of that class for you, if it is as you
> say.
> >
> > Kenn
> >
> > On Tue, Oct 23, 2018 at 6:04 PM Alex Amato  wrote:
> >>
> >> Hi Robert + beam dev list,
> >>
> >> I was thinking about your feedback in PR#6205, and agree that this
> monitoring_infos.py became a bit big.
> >>
> >> I'm working on the Java Implementation of this now, and want to
> incorporate some of these ideas and improve on this.
> >>
> >> I that that I should make something like a MonitoringInfoBuilder class.
> With a few methods
> >>
> >> setUrn
> >> setTimestamp
> >> setting the value (One method for each Type we support. Setting this
> will also set the type string)
> >>
> >> setInt64CounterValue
> >> setDoubleCounterValue
> >> setLatestInt64
> >> setTopNInt64
> >> setMonitoringDataTable
> >> setDistributionInt64
> >> ...
> >>
> >> setting labels (will set the key and value properly for the label)
> >>
> >> setPTransform(value)
> >> setPcollection(value)
> >> ...
> >>
> >>
> >> I think this will make building a metric much easier, you would just
> call the 4 methods and the .build(). These builders are common in Java. (I
> guess there is a similar thing way we could do in python? I'd like to go
> back and refactor that as well when I am done)

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath 
wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by a
> ParDo down the line.
>

KafkaIO does return bytes, and I think most sources should, unless there is
a good reason not to.
Given that, do we think Beam should provide a tranform that makes to
simpler to handle deadletter output? I think there was a thread about it in
the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>> can extract your Success Records (TupleTag) and your DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>> in a ParDo, which gives complete control on how to handle record errors.
>>> This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be convenient
>>> for users in many such scenarios. This is simpler than each source
>>> supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
>>> wrote:
>>>
 Given that KafkaIO uses UnboundeSource framework, this is probably not
 something that can easily be supported. We might be able to support similar
 features when we have Kafka on top of Splittable DoFn though.

>>> So feel free to create a feature request JIRA for this.

 Thanks,
 Cham

 On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:

> This is a great question. I've added the dev list to be sure it gets
> noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
> tobias.kay...@ricardo.ch> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline that uses a
>> KafkaIO
>> connector for it's input? As
>> `TimestampPolicyFactory.withTimestampFn()` takes
>> only a SerializableFunction and not a ParDo, how would I be able to
>> produce a
>> Deadletter output from it?
>>
>> I have the following pipeline defined that reads from a KafkaIO input:
>>
>> pipeline.apply(
>>   KafkaIO.read()
>> .withBootstrapServers(bootstrap)
>> .withTopics(topics)
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ConfigurableDeserializer.class)
>> .updateConsumerProperties(
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>> "earliest"))
>> .updateConsumerProperties(ImmutableMap.of("group.id",
>> "beam-consumers"))
>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>> "true"))
>> .withTimestampPolicyFactory(
>> TimestampPolicyFactory.withTimestampFn(
>> new MessageTimestampExtractor(inputMessagesConfig)))
>> .withReadCommitted()
>> .commitOffsetsInFinalize())
>>
>>
>> and I like to get deadletter outputs when my timestamp extraction
>> fails.
>>
>> Best,
>> Tobi
>>
>>


Re: [VOTE] Release 2.8.0, release candidate #1

2018-10-24 Thread Maximilian Michels
I've run WordCount using Quickstart with the FlinkRunner (locally and 
against a Flink cluster).


Would give a +1 but waiting what Kenn finds.

-Max

On 23.10.18 07:11, Ahmet Altay wrote:



On Mon, Oct 22, 2018 at 10:06 PM, Kenneth Knowles > wrote:


You two did so much verification I had a hard time finding something
where my help was meaningful! :-)

I did run the Nexmark suite on the DirectRunner against 2.7.0 and
2.8.0 following

https://beam.apache.org/documentation/sdks/java/nexmark/#running-smoke-suite-on-the-directrunner-local

.

It is admittedly a very silly test - the instructions leave
immutability enforcement on, etc. But it does appear that there is a
30% degradation in query 8 and 15% in query 9. These are the pure
Java tests, not the SQL variants. The rest of the queries are close
enough that differences are not meaningful.


(It would be a good improvement for us to have alerts on daily 
benchmarks if we do not have such a concept already.)



I would ask a little more time to see what is going on here - is it
a real performance issue or an artifact of how the tests are
invoked, or ...?


Thank you! Much appreciated. Please let us know when you are done with 
your investigation.



Kenn

On Mon, Oct 22, 2018 at 6:20 PM Ahmet Altay mailto:al...@google.com>> wrote:

Hi all,

Did you have a chance to review this RC? Between me and Robert
we ran a significant chunk of the validations. Let me know if
you have any questions.

Ahmet

On Thu, Oct 18, 2018 at 5:26 PM, Ahmet Altay mailto:al...@google.com>> wrote:

Hi everyone,

Please review and vote on the release candidate #1 for the
version 2.8.0, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific
comments)

The complete staging area is available for your review,
which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to
dist.apache.org  [2], which is
signed with the key with fingerprint 6096FA00 [3],
* all artifacts to be deployed to the Maven Central
Repository [4],
* source code tag "v2.8.0-RC1" [5],
* website pull request listing the release and publishing
the API reference manual [6].
* Python artifacts are deployed along with the source
release to the dist.apache.org  [2].
* Validation sheet with a tab for 2.8.0 release to help with
validation [7].

The vote will be open for at least 72 hours. It is adopted
by majority approval, with at least 3 PMC affirmative votes.

Thanks,
Ahmet

[1]

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12343985


[2] https://dist.apache.org/repos/dist/dev/beam/2.8.0

[3] https://dist.apache.org/repos/dist/dev/beam/KEYS

[4]

https://repository.apache.org/content/repositories/orgapachebeam-1049/


[5] https://github.com/apache/beam/tree/v2.8.0-RC1

[6] https://github.com/apache/beam-site/pull/583
 and
https://github.com/apache/beam/pull/6745

[7]

https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=1854712816







Jenkins build is back to normal : beam_SeedJob #2854

2018-10-24 Thread Apache Jenkins Server
See 



Build failed in Jenkins: beam_SeedJob_Standalone #1806

2018-10-24 Thread Apache Jenkins Server
See 


Changes:

[thw] [BEAM-5848] Fix coder for streaming impulse source.

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision a87dfe4a7bc669769c04221d94319aa72134ed35 (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f a87dfe4a7bc669769c04221d94319aa72134ed35
Commit message: "Merge pull request #6810: [BEAM-5848] Fix coder for streaming 
impulse source"
 > git rev-list --no-walk 7800c3078d8ecaee7d2e789f02b759e579263249 # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
Processing DSL script job_00_seed.groovy
Processing DSL script job_Dependency_Check.groovy
Processing DSL script job_Inventory.groovy
Processing DSL script job_PerformanceTests_Dataflow.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT_HDFS.groovy
Processing DSL script job_PerformanceTests_HadoopInputFormat.groovy
Processing DSL script job_PerformanceTests_JDBC.groovy
Processing DSL script job_PerformanceTests_MongoDBIO_IT.groovy
Processing DSL script job_PerformanceTests_Python.groovy
Processing DSL script job_PerformanceTests_Spark.groovy
Processing DSL script job_PostCommit_Go_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Dataflow.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Direct.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Flink.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Spark.groovy
Processing DSL script job_PostCommit_Java_PortableValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Apex.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Gearpump.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Samza.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Spark.groovy
Processing DSL script job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Python_Verify.groovy
Processing DSL script job_PostCommit_Website_Publish.groovy
Processing DSL script job_PostRelease_NightlySnapshot.groovy
Processing DSL script job_PreCommit_CommunityMetrics.groovy
Processing DSL script job_PreCommit_Go.groovy
Processing DSL script job_PreCommit_Java.groovy
Processing DSL script job_PreCommit_Python.groovy
Processing DSL script job_PreCommit_RAT.groovy
ERROR: startup failed:
job_PreCommit_RAT.groovy: 25: unexpected token: } @ line 25, column 1.
   }
   ^

1 error

Not sending mail to unregistered user ke...@google.com


Re: KafkaIO - Deadletter output

2018-10-24 Thread Chamikara Jayalath
Ah nice. Yeah, if user can return full bytes instead of applying a function
that would result in an exception,  this can be extracted by a ParDo down
the line.

On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia 
wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert that you
> can extract your Success Records (TupleTag) and your DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi  schrieb am Mi., 24. Okt. 2018, 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize explicitly in
>> a ParDo, which gives complete control on how to handle record errors. This
>> is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be convenient
>> for users in many such scenarios. This is simpler than each source
>> supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath 
>> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>>> something that can easily be supported. We might be able to support similar
>>> features when we have Kafka on top of Splittable DoFn though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles  wrote:
>>>
 This is a great question. I've added the dev list to be sure it gets
 noticed by whoever may know best.

 Kenn

 On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
 tobias.kay...@ricardo.ch> wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that uses a
> KafkaIO
> connector for it's input? As
> `TimestampPolicyFactory.withTimestampFn()` takes
> only a SerializableFunction and not a ParDo, how would I be able to
> produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a KafkaIO input:
>
> pipeline.apply(
>   KafkaIO.read()
> .withBootstrapServers(bootstrap)
> .withTopics(topics)
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ConfigurableDeserializer.class)
> .updateConsumerProperties(
> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> inputMessagesConfig))
> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> "earliest"))
> .updateConsumerProperties(ImmutableMap.of("group.id",
> "beam-consumers"))
> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> "true"))
> .withTimestampPolicyFactory(
> TimestampPolicyFactory.withTimestampFn(
> new MessageTimestampExtractor(inputMessagesConfig)))
> .withReadCommitted()
> .commitOffsetsInFinalize())
>
>
> and I like to get deadletter outputs when my timestamp extraction
> fails.
>
> Best,
> Tobi
>
>


Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Maximilian Michels
The FlinkRunner uses a hash function (MurmurHash) on each key which 
places keys somewhere in the hash space. The hash space (2^32) is split 
among the partitions (5 in your case). Given enough keys, the chance 
increases they are equally spread.


This should be similar to what the other Runners do.

On 24.10.18 10:58, Jozef Vilcek wrote:


So if I run 5 workers with 50 shards, I end up with:

DurationBytes receivedRecords received
  2m 39s        900 MB            465,525
  2m 39s       1.76 GB            930,720
  2m 39s        789 MB            407,315
  2m 39s       1.32 GB            698,262
  2m 39s        788 MB            407,310

Still not good but better than with 5 shards where some workers did not 
participate at all.

So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax > wrote:


withNumShards(5) generates 5 random shards. It turns out that
statistically when you generate 5 random shards and you have 5
works, the probability is reasonably high that some workers will get
more than one shard (and as a result not all workers will
participate). Are you able to set the number of shards larger than 5?

On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek mailto:jozo.vil...@gmail.com>> wrote:

cc (dev)

I tried to run the example with FlinkRunner in batch mode and
received again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above
example, pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten
had incompatible triggers:

AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(1)),

Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour,

AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),

Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
mailto:jozo.vil...@gmail.com>> wrote:

Hi Max,

I forgot to mention that example is run in streaming mode,
therefore I can not do writes without specifying shards.
FileIO explicitly asks for them.

I am not sure where the problem is. FlinkRunner is only one
I used.

On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

Hi Jozef,

This does not look like a FlinkRunner related problem,
but is caused by
the `WriteFiles` sharding logic. It assigns keys and
does a Reshuffle
which apparently does not lead to good data spread in
your case.

Do you see the same behavior without `withNumShards(5)`?

Thanks,
Max

On 22.10.18 11:57, Jozef Vilcek wrote:
 > Hello,
 >
 > I am having some trouble to get a balanced write via
FileIO. Workers at
 > the shuffle side where data per window fire are
written to the
 > filesystem receive unbalanced number of events.
 >
 > Here is a naive code example:
 >
 >      val read = KafkaIO.read()
 >          .withTopic("topic")
 >          .withBootstrapServers("kafka1:9092")
 > 
.withKeyDeserializer(classOf[ByteArrayDeserializer])
 > 
.withValueDeserializer(classOf[ByteArrayDeserializer])

 >          .withProcessingTime()
 >
 >      pipeline
 >          .apply(read)
 >          .apply(MapElements.via(new
 > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
String]() {
 >            override def apply(input:
KafkaRecord[Array[Byte],
 > Array[Byte]]): String = {
 >              new String(input.getKV.getValue, "UTF-8")
 >            }
 >          }))
 >
 >
 >

.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
 >              .triggering(AfterWatermark.pastEndOfWindow()
 > 
.withEarlyFirings(AfterPane.elementCountAtLeast(4))
 >

Re: Follow up ideas, to simplify creating MonitoringInfos.

2018-10-24 Thread Robert Bradshaw
Thanks for bringing this to the list; it's a good question.

I think the difficulty comes from trying to statically define a lists
of possibilities that should instead be runtime values. E.g. we
currently we're up to about a dozen distinct types, and having a
setter for each is both verbose and not very extensible (especially
replicated cross language). I'm not sure the set of possible labels is
fixed either. Generally in the FnAPI we've been using shared constants
for this kind of thing instead. (I was wary about the protos for the
same reasons; it would be good to avoid leaking this through to each
of the various SDKs.) The amount of static typing/validation one gets
depends on how much logic you build into each of these methods (e.g.
does it (almost?) always "metric" which is assumed to already be
encoded correctly, or a specific type that has tradeoffs with the
amount you can do generically (e.g. we have code that first loops over
counters, then over distributions, then over gauges, and I don't think
we want continue that pattern all M places for all N types)).

I would probably lean towards mostly doing runtime validation here.
Specifically, one would have a data file that defines, for each known
URN, its type along with the set of permitted/expected/required
labels. On construction a MonitoringInfo data point, one would provide
a URN + value + labelMap, and optionally a type. On construction, the
constructor (method, factory, whatever) would look up the URN to
determine the type (or throw an error if it's both not known and not
provided), which could be then used to fetch an encoder of sorts (that
can go from value <-> proto encoding, possibly with some validation).
If labels and/or annotations are provided and the URN is known, we can
validate these as well.

As for proto/enums vs. yaml, the former is nice because code
generation comes for free, but has turned out to be much more verbose
(both the definition and the use) and I'm still on the fence whether
it's a net win.

(Unfortunately AutoValue won't help much here, as the ultimate goal is
to wrap a very nested proto OneOf, ideally with some validation.
(Also, in Python, generally, having optional, named arguments makes
this kind of builder pattern less needed.))

On Wed, Oct 24, 2018 at 4:12 AM Kenneth Knowles  wrote:
>
> FWIW AutoValue will build most of that class for you, if it is as you say.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 6:04 PM Alex Amato  wrote:
>>
>> Hi Robert + beam dev list,
>>
>> I was thinking about your feedback in PR#6205, and agree that this 
>> monitoring_infos.py became a bit big.
>>
>> I'm working on the Java Implementation of this now, and want to incorporate 
>> some of these ideas and improve on this.
>>
>> I that that I should make something like a MonitoringInfoBuilder class. With 
>> a few methods
>>
>> setUrn
>> setTimestamp
>> setting the value (One method for each Type we support. Setting this will 
>> also set the type string)
>>
>> setInt64CounterValue
>> setDoubleCounterValue
>> setLatestInt64
>> setTopNInt64
>> setMonitoringDataTable
>> setDistributionInt64
>> ...
>>
>> setting labels (will set the key and value properly for the label)
>>
>> setPTransform(value)
>> setPcollection(value)
>> ...
>>
>>
>> I think this will make building a metric much easier, you would just call 
>> the 4 methods and the .build(). These builders are common in Java. (I guess 
>> there is a similar thing way we could do in python? I'd like to go back and 
>> refactor that as well when I am done)
>>
>> -
>>
>> As for your other suggestion to define metrics in the proto/enum file 
>> instead of the yaml file. I am not too sure about the best strategy for 
>> this. My initial thoughts are:
>>
>> Make a proto extension allowing you to describe/define a MonitoringInfo's 
>> (the same info as the metric_definitions.yaml file):
>>
>> URN
>> Type
>> Labels required
>> Annotations: Description, Units, etc.
>>
>> Make the builder read in that MonitoringInfo definision/description assert 
>> everything is set properly? I think this would be a decent data driven 
>> approach.
>>
>> I was wondering if you had something else in mind?
>>
>> Thanks
>> Alex
>>
>>


Re: [DISCUSS] Publish vendored dependencies independently

2018-10-24 Thread Maximilian Michels

Would also keep it simple and optimize for the JAR size only if necessary.

On 24.10.18 00:06, Kenneth Knowles wrote:
I think it makes sense for each vendored dependency to be self-contained 
as much as possible. It should keep it fairly simple. Things that cross 
their API surface cannot be hidden, of course. Jar size is not a concern 
IMO.


Kenn

On Tue, Oct 23, 2018 at 9:05 AM Lukasz Cwik > wrote:


How should we handle the transitive dependencies of the things we
want to vendor?

For example we use gRPC which depends on Guava 20 and we also use
Calcite which depends on Guava 19.

Should the vendored gRPC/Calcite/... be self-contained so it
contains all its dependencies, hence vendored gRPC would contain
Guava 20 and vendored Calcite would contain Guava 19 (both under
different namespaces)?
This leads to larger jars but less vendored dependencies to maintain.

Or should we produce a vendored library for those that we want to
share, e.g. Guava 20 that could be reused across multiple vendored
libraries?
Makes the vendoring process slightly more complicated, more
dependencies to maintain, smaller jars.

Or should we produce a vendored library for each dependency?
Lots of vendoring needed, likely tooling required to be built to
maintain this.




On Tue, Oct 23, 2018 at 8:46 AM Kenneth Knowles mailto:k...@google.com>> wrote:

I actually created the subtasks by finding things shaded by at
least one module. I think each one should definitely have an on
list discussion that clarifies the target artifact, namespace,
version, possible complications, etc.

My impression is that many many modules shade only Guava. So for
build time and simplification that is a big win.

Kenn

On Tue, Oct 23, 2018, 08:16 Thomas Weise mailto:t...@apache.org>> wrote:

+1 for separate artifacts

I would request that we explicitly discuss and agree which
dependencies we vendor though.

Not everything listed in the JIRA subtasks is currently
relocated.

Thomas


On Tue, Oct 23, 2018 at 8:04 AM David Morávek
mailto:david.mora...@gmail.com>>
wrote:

+1 This should improve build times a lot. It would be
great if vendored deps could stay in the main repository.

D.

On Tue, Oct 23, 2018 at 12:21 PM Maximilian Michels
mailto:m...@apache.org>> wrote:

Looks great, Kenn!

 > Max: what is the story behind having a separate
flink-shaded repo? Did that make it easier to manage
in some way?

Better separation of concerns, but I don't think
releasing the shaded
artifacts from the main repo is a problem. I'd even
prefer not to split
up the repo because it makes updates to the vendored
dependencies
slightly easier.

On 23.10.18 03:25, Kenneth Knowles wrote:
 > OK, I've filed
https://issues.apache.org/jira/browse/BEAM-5819 to
 > collect sub-tasks. This has enough upsides
throughout lots of areas of
 > the project that even though it is not glamorous
it seems pretty
 > valuable to start on immediately. And I want to
find out if there's a
 > pitfall lurking.
 >
 > Max: what is the story behind having a separate
flink-shaded repo? Did
 > that make it easier to manage in some way?
 >
 > Kenn
 >
 > On Mon, Oct 22, 2018 at 2:55 AM Maximilian
Michels mailto:m...@apache.org>
 > >>
wrote:
 >
 >     +1 for publishing vendored Jars
independently. It will improve build
 >     time and ease IntelliJ integration.
 >
 >     Flink also publishes shaded dependencies
separately:
 >
 >     - https://github.com/apache/flink-shaded
 >     -
https://issues.apache.org/jira/browse/FLINK-6529
 >
 >     AFAIK their main motivation was to get rid of
duplicate shaded classes
 >     on the classpath. We don't appear to have
that problem because 

Re: Data Preprocessing in Beam

2018-10-24 Thread Maximilian Michels
Welcome Alejandro! Interesting work. The sketching extension looks like 
a good place for your algorithms.


-Max

On 23.10.18 19:05, Lukasz Cwik wrote:
Arnoud Fournier (afourn...@talend.com ) 
started by adding a library to support sketching 
(https://github.com/apache/beam/tree/master/sdks/java/extensions/sketching), 
I feel as those some of these could be added there or possibly within 
another extension.


On Tue, Oct 23, 2018 at 9:54 AM Austin Bennett 
mailto:whatwouldausti...@gmail.com>> wrote:


Hi Beam Devs,

Alejandro, copied, is an enthusiastic developer, who recently coded up:
https://github.com/elbaulp/DPASF (associated paper found:
https://arxiv.org/abs/1810.06021).

He had been looking to contribute that code to FlinkML, at which
point I found him and alerted him to Beam.  He has been learning a
bit on Beam recently.  Would this data-preprocessing be a welcome
contribution to the project.  If yes, perhaps others better versed
in internals (I'm not there yet -- though could follow along!) would
be willing to provide feedback to shape this to be a suitable Beam
contribution.

Cheers,
Austin




Build failed in Jenkins: beam_SeedJob #2853

2018-10-24 Thread Apache Jenkins Server
See 

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam13 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 7800c3078d8ecaee7d2e789f02b759e579263249 (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 7800c3078d8ecaee7d2e789f02b759e579263249
Commit message: "Merge pull request #6807: [BEAM-5833] Fix java-harness build 
by adding flush() to BeamFnDataWriteRunnerTest"
 > git rev-list --no-walk 7800c3078d8ecaee7d2e789f02b759e579263249 # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
Processing DSL script job_00_seed.groovy
Processing DSL script job_Dependency_Check.groovy
Processing DSL script job_Inventory.groovy
Processing DSL script job_PerformanceTests_Dataflow.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT_HDFS.groovy
Processing DSL script job_PerformanceTests_HadoopInputFormat.groovy
Processing DSL script job_PerformanceTests_JDBC.groovy
Processing DSL script job_PerformanceTests_MongoDBIO_IT.groovy
Processing DSL script job_PerformanceTests_Python.groovy
Processing DSL script job_PerformanceTests_Spark.groovy
Processing DSL script job_PostCommit_Go_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Dataflow.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Direct.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Flink.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Spark.groovy
Processing DSL script job_PostCommit_Java_PortableValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Apex.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Gearpump.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Samza.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Spark.groovy
Processing DSL script job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Python_Verify.groovy
Processing DSL script job_PostCommit_Website_Publish.groovy
Processing DSL script job_PostRelease_NightlySnapshot.groovy
Processing DSL script job_PreCommit_CommunityMetrics.groovy
Processing DSL script job_PreCommit_Go.groovy
Processing DSL script job_PreCommit_Java.groovy
Processing DSL script job_PreCommit_Python.groovy
Processing DSL script job_PreCommit_RAT.groovy
ERROR: startup failed:
job_PreCommit_RAT.groovy: 25: unexpected token: } @ line 25, column 1.
   }
   ^

1 error

Not sending mail to unregistered user ke...@google.com


Build failed in Jenkins: beam_SeedJob_Standalone #1805

2018-10-24 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam2 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 7800c3078d8ecaee7d2e789f02b759e579263249 (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 7800c3078d8ecaee7d2e789f02b759e579263249
Commit message: "Merge pull request #6807: [BEAM-5833] Fix java-harness build 
by adding flush() to BeamFnDataWriteRunnerTest"
 > git rev-list --no-walk 7800c3078d8ecaee7d2e789f02b759e579263249 # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
Processing DSL script job_00_seed.groovy
Processing DSL script job_Dependency_Check.groovy
Processing DSL script job_Inventory.groovy
Processing DSL script job_PerformanceTests_Dataflow.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT_HDFS.groovy
Processing DSL script job_PerformanceTests_HadoopInputFormat.groovy
Processing DSL script job_PerformanceTests_JDBC.groovy
Processing DSL script job_PerformanceTests_MongoDBIO_IT.groovy
Processing DSL script job_PerformanceTests_Python.groovy
Processing DSL script job_PerformanceTests_Spark.groovy
Processing DSL script job_PostCommit_Go_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Dataflow.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Direct.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Flink.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Spark.groovy
Processing DSL script job_PostCommit_Java_PortableValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Apex.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Gearpump.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Samza.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Spark.groovy
Processing DSL script job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Python_Verify.groovy
Processing DSL script job_PostCommit_Website_Publish.groovy
Processing DSL script job_PostRelease_NightlySnapshot.groovy
Processing DSL script job_PreCommit_CommunityMetrics.groovy
Processing DSL script job_PreCommit_Go.groovy
Processing DSL script job_PreCommit_Java.groovy
Processing DSL script job_PreCommit_Python.groovy
Processing DSL script job_PreCommit_RAT.groovy
ERROR: startup failed:
job_PreCommit_RAT.groovy: 25: unexpected token: } @ line 25, column 1.
   }
   ^

1 error

Not sending mail to unregistered user ke...@google.com


Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
So if I run 5 workers with 50 shards, I end up with:

Duration Bytes received Records received
 2m 39s 900 MB 465,525
 2m 39s1.76 GB 930,720
 2m 39s 789 MB 407,315
 2m 39s1.32 GB 698,262
 2m 39s 788 MB 407,310

Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax  wrote:

> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5 works, the
> probability is reasonably high that some workers will get more than one
> shard (and as a result not all workers will participate). Are you able to
> set the number of shards larger than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek 
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode in above example,
>> pipeline crashed before launch
>>
>> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
>> incompatible triggers:
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> entCountAtLeast(1)),
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> hour,
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> rever(AfterPane.elementCountAtLeast(1)),
>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(
>>
>>
>>
>>
>>
>> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek 
>> wrote:
>>
>>> Hi Max,
>>>
>>> I forgot to mention that example is run in streaming mode, therefore I
>>> can not do writes without specifying shards. FileIO explicitly asks for
>>> them.
>>>
>>> I am not sure where the problem is. FlinkRunner is only one I used.
>>>
>>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
>>> wrote:
>>>
 Hi Jozef,

 This does not look like a FlinkRunner related problem, but is caused by
 the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
 which apparently does not lead to good data spread in your case.

 Do you see the same behavior without `withNumShards(5)`?

 Thanks,
 Max

 On 22.10.18 11:57, Jozef Vilcek wrote:
 > Hello,
 >
 > I am having some trouble to get a balanced write via FileIO. Workers
 at
 > the shuffle side where data per window fire are written to the
 > filesystem receive unbalanced number of events.
 >
 > Here is a naive code example:
 >
 >  val read = KafkaIO.read()
 >  .withTopic("topic")
 >  .withBootstrapServers("kafka1:9092")
 >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
 >  .withValueDeserializer(classOf[ByteArrayDeserializer])
 >  .withProcessingTime()
 >
 >  pipeline
 >  .apply(read)
 >  .apply(MapElements.via(new
 > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
 >override def apply(input: KafkaRecord[Array[Byte],
 > Array[Byte]]): String = {
 >  new String(input.getKV.getValue, "UTF-8")
 >}
 >  }))
 >
 >
 > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
 >  .triggering(AfterWatermark.pastEndOfWindow()
 >
 .withEarlyFirings(AfterPane.elementCountAtLeast(4))
 >
 .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
 >
 Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
 >
 >
 Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
 >  .discardingFiredPanes()
 >  .withAllowedLateness(Duration.standardDays(7)))
 >
 >  .apply(FileIO.write()
 >  .via(TextIO.sink())
 >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
 >  .withTempDirectory(tempLocation)
 >  .withNumShards(5))
 >
 >
 > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
 > number of shards), I would expect that each worker will participate
 on
 > persisting shards and equally, since code uses fixed number of shards
 > (and random shard assign?). But reality is different (see 2
 attachements
 > - statistiscs from flink task reading from kafka and task writing to
 files)
 >
 > What am I missing? How to achieve balanced writes?
 >
 > Thanks,
 > Jozef
 >
 >

>>>


Jenkins build is back to normal : beam_Release_Gradle_NightlySnapshot #217

2018-10-24 Thread Apache Jenkins Server
See 




Re: [PROPOSAL] allow the users to anticipate the support of features in the targeted runner.

2018-10-24 Thread Etienne Chauchot
Hi guys,
To sum up what we said, I just opened this 
ticket:https://issues.apache.org/jira/browse/BEAM-5849
Etienne
Le jeudi 18 octobre 2018 à 12:44 +0200, Maximilian Michels a écrit :
> Plugins for IDEs would be amazing because they could provide feedback already 
> during pipeline construction, but I'm
> not sure about the effort required to develop/maintain such plugins.
> Ultimately, Runners have to decide whether they can translate the given 
> pipeline or not. So I'm leaning more towards
> an approach to make intermediate checking of the pipeline translation easier, 
> e.g. by
> - providing the target Runner already during development- running check of 
> the Runner alongside with the DirectRunner
> (which is typically used when developing pipelines)
> On 17.10.18 15:57, Etienne Chauchot wrote:
> Hey Max, Kenn,
> Thanks for your feedback !
> Yes the idea was to inform the user as soon as possible, ideally while coding 
> the pipeline. It could be done with a
> IDE plugin (like checkstyle) that is configured with the targeted runner; 
> that way the targeted runner conf is not
> part of the pipeline code in an annotation which would be against Beam 
> portability philosophy. Such a plugin could
> color unsupported features while coding.
> Another way could be to implement it as a javadoc but it seems weak because 
> not automatic enough.Another way could be
> to implement it as a validation plugin in the build system but IMHO it is 
> already too late for the user.
> So, long story short, I'm more in favor of an IDE plugin or similar 
> coding-time solution.
> BestEtienne
> Le mercredi 17 octobre 2018 à 12:11 +0200, Maximilian Michels a écrit :
> This is a good idea. It needs to be fleshed out how the capability of aRunner 
> would be visible to the user (apart from
> the compatibility matrix).
> A dry-run feature would be useful, i.e. the user can run an inspectionon the 
> pipeline to see if it contains any
> features which are notsupported by the Runner.
> On 17.10.18 00:03, Rui Wang wrote:Sounds like a good idea.
> Sounds like while coding, user gets a list to show if a feature issupported 
> on different runners. User can check the
> list for the answer.Is my understanding correct? Will this approach become 
> slow as number ofrunner grows? (it's just a
> question as I am not familiar the performanceof combination of a long list, 
> annotation and IDE)
> 
> -Rui
> On Sat, Oct 13, 2018 at 11:56 PM Reuven Lax     >> wrote:
>  Sounds like a good idea. I don't think it will work for all 
> capabilities (e.g. some of them such as "exactly
> once" apply to all of the API surface), but useful for the ones that we 
> can capture.
>  On Thu, Oct 4, 2018 at 2:43 AM Etienne Chauchot 
> mailto:echauc...@apache.org>   echauc...@apache.org >> wrote:
>  Hi guys, As part of our user experience improvement to 
> attract new Beam users, I would like
> to suggest something:
>  Today we only have the capability matrix to inform users about   
>   features support among runners. But,
> they might discover only when the pipeline runs, when they receive an 
> exception, that a given feature
> is not supported by the targeted runner. I would like to suggest to 
> translate the capability matrix
> into the API with annotations for example, so that, while coding, the 
> user could know that, for now, a
> given feature is not supported on the runner he targets.
>  I know that the runner is only specified at pipeline runtime,
>  and that adding code would be a leak of
> runner implementation and against portability. So it could be just 
> informative annotations
> like @Experimental for example with no annotation processor.
>  WDYT?
>  Etienne

Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Reuven Lax
withNumShards(5) generates 5 random shards. It turns out that statistically
when you generate 5 random shards and you have 5 works, the probability is
reasonably high that some workers will get more than one shard (and as a
result not all workers will participate). Are you able to set the number of
shards larger than 5?

On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek  wrote:

> cc (dev)
>
> I tried to run the example with FlinkRunner in batch mode and received
> again bad data spread among the workers.
>
> When I tried to remove number of shards for batch mode in above example,
> pipeline crashed before launch
>
> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
> incompatible triggers:
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> entCountAtLeast(1)),
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> hour,
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> rever(AfterPane.elementCountAtLeast(1)),
> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(
>
>
>
>
>
> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek 
> wrote:
>
>> Hi Max,
>>
>> I forgot to mention that example is run in streaming mode, therefore I
>> can not do writes without specifying shards. FileIO explicitly asks for
>> them.
>>
>> I am not sure where the problem is. FlinkRunner is only one I used.
>>
>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
>> wrote:
>>
>>> Hi Jozef,
>>>
>>> This does not look like a FlinkRunner related problem, but is caused by
>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>> which apparently does not lead to good data spread in your case.
>>>
>>> Do you see the same behavior without `withNumShards(5)`?
>>>
>>> Thanks,
>>> Max
>>>
>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>> > Hello,
>>> >
>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>> at
>>> > the shuffle side where data per window fire are written to the
>>> > filesystem receive unbalanced number of events.
>>> >
>>> > Here is a naive code example:
>>> >
>>> >  val read = KafkaIO.read()
>>> >  .withTopic("topic")
>>> >  .withBootstrapServers("kafka1:9092")
>>> >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>> >  .withValueDeserializer(classOf[ByteArrayDeserializer])
>>> >  .withProcessingTime()
>>> >
>>> >  pipeline
>>> >  .apply(read)
>>> >  .apply(MapElements.via(new
>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>> >override def apply(input: KafkaRecord[Array[Byte],
>>> > Array[Byte]]): String = {
>>> >  new String(input.getKV.getValue, "UTF-8")
>>> >}
>>> >  }))
>>> >
>>> >
>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>> >  .triggering(AfterWatermark.pastEndOfWindow()
>>> >
>>> .withEarlyFirings(AfterPane.elementCountAtLeast(4))
>>> >
>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>> >
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
>>> >
>>> >
>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
>>> >  .discardingFiredPanes()
>>> >  .withAllowedLateness(Duration.standardDays(7)))
>>> >
>>> >  .apply(FileIO.write()
>>> >  .via(TextIO.sink())
>>> >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>> >  .withTempDirectory(tempLocation)
>>> >  .withNumShards(5))
>>> >
>>> >
>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>> > number of shards), I would expect that each worker will participate on
>>> > persisting shards and equally, since code uses fixed number of shards
>>> > (and random shard assign?). But reality is different (see 2
>>> attachements
>>> > - statistiscs from flink task reading from kafka and task writing to
>>> files)
>>> >
>>> > What am I missing? How to achieve balanced writes?
>>> >
>>> > Thanks,
>>> > Jozef
>>> >
>>> >
>>>
>>


Re: [ANNOUNCE] New committers, October 2018

2018-10-24 Thread Etienne Chauchot
Congrats and welcome !EtienneLe vendredi 19 octobre 2018 à 07:09 -0700, Kenneth 
Knowles a écrit :
> Hi all,
> Hot on the tail of the summer announcement comes our pre-Hallowe'en 
> celebration.
> 
> Please join me and the rest of the Beam PMC in welcoming the following new 
> committers:
> 
>  - Xinyu Liu, author/maintainer of the Samza runner
>  - Ankur Goenka, major contributor to portability efforts
> 
> And, as before, while I've noted some areas of contribution for each, most 
> important is that they are a valued part of
> our Beam community that the PMC trusts with the responsibilities of a Beam 
> committer [1].
> 
> A big thanks to both for their contributions.
> 
> Kenn
> 
> [1] 
> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer

Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
cc (dev)

I tried to run the example with FlinkRunner in batch mode and received
again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above example,
pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten had
incompatible triggers:
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(1)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour,
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek  wrote:

> Hi Max,
>
> I forgot to mention that example is run in streaming mode, therefore I can
> not do writes without specifying shards. FileIO explicitly asks for them.
>
> I am not sure where the problem is. FlinkRunner is only one I used.
>
> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
> wrote:
>
>> Hi Jozef,
>>
>> This does not look like a FlinkRunner related problem, but is caused by
>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>> which apparently does not lead to good data spread in your case.
>>
>> Do you see the same behavior without `withNumShards(5)`?
>>
>> Thanks,
>> Max
>>
>> On 22.10.18 11:57, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > I am having some trouble to get a balanced write via FileIO. Workers at
>> > the shuffle side where data per window fire are written to the
>> > filesystem receive unbalanced number of events.
>> >
>> > Here is a naive code example:
>> >
>> >  val read = KafkaIO.read()
>> >  .withTopic("topic")
>> >  .withBootstrapServers("kafka1:9092")
>> >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >  .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >  .withProcessingTime()
>> >
>> >  pipeline
>> >  .apply(read)
>> >  .apply(MapElements.via(new
>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>> >override def apply(input: KafkaRecord[Array[Byte],
>> > Array[Byte]]): String = {
>> >  new String(input.getKV.getValue, "UTF-8")
>> >}
>> >  }))
>> >
>> >
>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >  .triggering(AfterWatermark.pastEndOfWindow()
>> >  .withEarlyFirings(AfterPane.elementCountAtLeast(4))
>> >
>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
>> >
>> >
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
>> >  .discardingFiredPanes()
>> >  .withAllowedLateness(Duration.standardDays(7)))
>> >
>> >  .apply(FileIO.write()
>> >  .via(TextIO.sink())
>> >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
>> >  .withTempDirectory(tempLocation)
>> >  .withNumShards(5))
>> >
>> >
>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>> > number of shards), I would expect that each worker will participate on
>> > persisting shards and equally, since code uses fixed number of shards
>> > (and random shard assign?). But reality is different (see 2
>> attachements
>> > - statistiscs from flink task reading from kafka and task writing to
>> files)
>> >
>> > What am I missing? How to achieve balanced writes?
>> >
>> > Thanks,
>> > Jozef
>> >
>> >
>>
>


Build failed in Jenkins: beam_SeedJob #2852

2018-10-24 Thread Apache Jenkins Server
See 


Changes:

[kedin] Fix java-harness build by adding flush() to

--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam14 (beam) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 7800c3078d8ecaee7d2e789f02b759e579263249 (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 7800c3078d8ecaee7d2e789f02b759e579263249
Commit message: "Merge pull request #6807: [BEAM-5833] Fix java-harness build 
by adding flush() to BeamFnDataWriteRunnerTest"
 > git rev-list --no-walk 5e603ad4c642cfba0a6db70abd05ed8e9d89c7d6 # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
Processing DSL script job_00_seed.groovy
Processing DSL script job_Dependency_Check.groovy
Processing DSL script job_Inventory.groovy
Processing DSL script job_PerformanceTests_Dataflow.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT.groovy
Processing DSL script job_PerformanceTests_FileBasedIO_IT_HDFS.groovy
Processing DSL script job_PerformanceTests_HadoopInputFormat.groovy
Processing DSL script job_PerformanceTests_JDBC.groovy
Processing DSL script job_PerformanceTests_MongoDBIO_IT.groovy
Processing DSL script job_PerformanceTests_Python.groovy
Processing DSL script job_PerformanceTests_Spark.groovy
Processing DSL script job_PostCommit_Go_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_GradleBuild.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Dataflow.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Direct.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Flink.groovy
Processing DSL script job_PostCommit_Java_Nexmark_Spark.groovy
Processing DSL script job_PostCommit_Java_PortableValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Apex.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Gearpump.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Samza.groovy
Processing DSL script job_PostCommit_Java_ValidatesRunner_Spark.groovy
Processing DSL script job_PostCommit_Python_ValidatesContainer_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
Processing DSL script job_PostCommit_Python_ValidatesRunner_Flink.groovy
Processing DSL script job_PostCommit_Python_Verify.groovy
Processing DSL script job_PostCommit_Website_Publish.groovy
Processing DSL script job_PostRelease_NightlySnapshot.groovy
Processing DSL script job_PreCommit_CommunityMetrics.groovy
Processing DSL script job_PreCommit_Go.groovy
Processing DSL script job_PreCommit_Java.groovy
Processing DSL script job_PreCommit_Python.groovy
Processing DSL script job_PreCommit_RAT.groovy
ERROR: startup failed:
job_PreCommit_RAT.groovy: 25: unexpected token: } @ line 25, column 1.
   }
   ^

1 error

Not sending mail to unregistered user ke...@google.com