[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2020-06-10 Thread Beam JIRA Bot (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132150#comment-17132150
 ] 

Beam JIRA Bot commented on BEAM-644:


This issue is P2 but has been unassigned without any comment for 60 days so it 
has been labeled "stale-P2". If this issue is still affecting you, we care! 
Please comment and remove the label. Otherwise, in 14 days the issue will be 
moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed 
explanation of what these priorities mean.


> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: P2
>  Labels: stale-P2
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2019-09-24 Thread Luke Cwik (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937240#comment-16937240
 ] 

Luke Cwik commented on BEAM-644:


SplittableDoFn will allow one to control the watermark but you will always be 
limited by how far the watermark has been advanced of upstream transforms. This 
will allow you to have scenarios like Read(PubsubA) --> Read(PubsubB) where 
PubsubA outputs topic names and PubsubB reads messages from the topics produced 
by PubsubA. The watermark of PubsubA will be the upperbound for how far 
PubsubB's watermark can advance. This is still somewhat up for discussion as 
there could be scenarios where you want the downstream transform to be able to 
advance the watermark further then the upstream transforms watermark but this 
leads to usability questions around what should be dropped and where.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2019-09-18 Thread Kenneth Knowles (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933024#comment-16933024
 ] 

Kenneth Knowles commented on BEAM-644:
--

I want to clarify that the watermark is _not_ global. You might be thinking of 
Spark streaming :-)

But I understand your point. The watermark is mostly not in your control. It is 
always simply bounded by all the input watermarks. I agree with your desire and 
I agree it is doable. The new feature of "splittable DoFn" which replaces 
UnboundedSource will let a DoFn control the watermark somewhat directly. I'm 
not sure if it will be as flexible as you desire. [~lcwik] and [~chamikara] and 
[~laraschmidt] and [~boyuanz] are all more knowledgeable than I am.

As an example to demonstrate it is implementable and meaningful: you could 
certainly have two pipelines where one is writing to Pubsub instead of 
outputWithTimestamp. The other pipeline could read from Pubsub and use a 
timestamp attribute. In this system you get an independent watermark calculated 
heuristically at the junction between the two pipelines. (this is not a great 
idea, just an example of an architecture that decouples the watermark)

But by the way, taking one branch and mapping all event timestamps to the 
current processing time is safe. I illustrate it here: 
https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134
 (click once to animate). The part of your use case that requires something 
special is pulling event timestamps from JSON instead of from Pubsub attributes 
(the PubsubIO has no idea how to parse the payloads, as you already know). It 
is a very common and valid use case.



> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2019-09-17 Thread Chris Heath (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931986#comment-16931986
 ] 

Chris Heath commented on BEAM-644:
--

I recently asked a somewhat related question on StackOverflow 
[https://stackoverflow.com/questions/57960673/how-to-switch-time-domains-in-a-beam-pipeline],
 and after further thought, I believe what is really needed is for the 
watermark not to be global.  At well-defined points in the pipeline (namely 
DoFns that call outputWithTimestamp), we need to be able to switch to a 
completely separate watermark that applies from that point on.  Each watermark 
in the pipeline progresses independently, at different speeds with separate 
heuristics.

Our particular application ideally requires 3 time domains with 3 watermarks:  
First read data from Google pubsub (timestamps = published date), then parse 
the JSON payload. From there the pipeline branches: one branch switches to 
event time (timestamps obtained from JSON payload) and applies event 
correlations, with late data dropped; the second branch processes parsed events 
in batches (timestamps = processing time) with no late data ever.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2019-04-23 Thread Deniz Saner (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823895#comment-16823895
 ] 

Deniz Saner commented on BEAM-644:
--

Thanks [~kenn]! As we couldn't afford to loose data, we simply built a small 
service, which reads batched messages from pubsub and republishes them as 
singular items with the timestamp attribute set correctly. Doing so, we could 
eliminate any use of withAllowedTimestampSkew from our code base. I'm just 
writing this here so that other people may take this as a starting point.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2019-04-11 Thread Kenneth Knowles (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815663#comment-16815663
 ] 

Kenneth Knowles commented on BEAM-644:
--

This issue is dormant, but still totally valid. The fact is that 
withAllowedTimestampSkew is dangerous and there is no replacement. If you are 
very careful, you can manage to use withAllowedTimestampSkew and not lose data. 
If it is working for you, feel free to keep using it.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2019-04-08 Thread Deniz Saner (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812300#comment-16812300
 ] 

Deniz Saner commented on BEAM-644:
--

Can somebody elaborate on the status of this issue? The method 
`withAllowedTimestampSkew` has been marked deprecated for quite some time now, 
but from the issue links, I cannot really infer whether a suitable replacement 
has been implemented yet.

At my company, we are finding ourselves still using `withAllowedTimestampSkew`, 
because we are batching and our data and one message observed at timestamp x 
may contain hundreds of messages at timestamp x - 10.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Priority: Major
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)