[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132150#comment-17132150 ] Beam JIRA Bot commented on BEAM-644: This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3. Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: P2 > Labels: stale-P2 > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16937240#comment-16937240 ] Luke Cwik commented on BEAM-644: SplittableDoFn will allow one to control the watermark but you will always be limited by how far the watermark has been advanced of upstream transforms. This will allow you to have scenarios like Read(PubsubA) --> Read(PubsubB) where PubsubA outputs topic names and PubsubB reads messages from the topics produced by PubsubA. The watermark of PubsubA will be the upperbound for how far PubsubB's watermark can advance. This is still somewhat up for discussion as there could be scenarios where you want the downstream transform to be able to advance the watermark further then the upstream transforms watermark but this leads to usability questions around what should be dropped and where. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: Major > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933024#comment-16933024 ] Kenneth Knowles commented on BEAM-644: -- I want to clarify that the watermark is _not_ global. You might be thinking of Spark streaming :-) But I understand your point. The watermark is mostly not in your control. It is always simply bounded by all the input watermarks. I agree with your desire and I agree it is doable. The new feature of "splittable DoFn" which replaces UnboundedSource will let a DoFn control the watermark somewhat directly. I'm not sure if it will be as flexible as you desire. [~lcwik] and [~chamikara] and [~laraschmidt] and [~boyuanz] are all more knowledgeable than I am. As an example to demonstrate it is implementable and meaningful: you could certainly have two pipelines where one is writing to Pubsub instead of outputWithTimestamp. The other pipeline could read from Pubsub and use a timestamp attribute. In this system you get an independent watermark calculated heuristically at the junction between the two pipelines. (this is not a great idea, just an example of an architecture that decouples the watermark) But by the way, taking one branch and mapping all event timestamps to the current processing time is safe. I illustrate it here: https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134 (click once to animate). The part of your use case that requires something special is pulling event timestamps from JSON instead of from Pubsub attributes (the PubsubIO has no idea how to parse the payloads, as you already know). It is a very common and valid use case. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: Major > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931986#comment-16931986 ] Chris Heath commented on BEAM-644: -- I recently asked a somewhat related question on StackOverflow [https://stackoverflow.com/questions/57960673/how-to-switch-time-domains-in-a-beam-pipeline], and after further thought, I believe what is really needed is for the watermark not to be global. At well-defined points in the pipeline (namely DoFns that call outputWithTimestamp), we need to be able to switch to a completely separate watermark that applies from that point on. Each watermark in the pipeline progresses independently, at different speeds with separate heuristics. Our particular application ideally requires 3 time domains with 3 watermarks: First read data from Google pubsub (timestamps = published date), then parse the JSON payload. From there the pipeline branches: one branch switches to event time (timestamps obtained from JSON payload) and applies event correlations, with late data dropped; the second branch processes parsed events in batches (timestamps = processing time) with no late data ever. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: Major > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823895#comment-16823895 ] Deniz Saner commented on BEAM-644: -- Thanks [~kenn]! As we couldn't afford to loose data, we simply built a small service, which reads batched messages from pubsub and republishes them as singular items with the timestamp attribute set correctly. Doing so, we could eliminate any use of withAllowedTimestampSkew from our code base. I'm just writing this here so that other people may take this as a starting point. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: Major > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815663#comment-16815663 ] Kenneth Knowles commented on BEAM-644: -- This issue is dormant, but still totally valid. The fact is that withAllowedTimestampSkew is dangerous and there is no replacement. If you are very careful, you can manage to use withAllowedTimestampSkew and not lose data. If it is working for you, feel free to keep using it. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: Major > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812300#comment-16812300 ] Deniz Saner commented on BEAM-644: -- Can somebody elaborate on the status of this issue? The method `withAllowedTimestampSkew` has been marked deprecated for quite some time now, but from the issue links, I cannot really infer whether a suitable replacement has been implemented yet. At my company, we are finding ourselves still using `withAllowedTimestampSkew`, because we are batching and our data and one message observed at timestamp x may contain hundreds of messages at timestamp x - 10. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Kenneth Knowles >Priority: Major > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)