On Mon, Oct 29, 2018 at 5:26 AM Jozef Vilcek <[email protected]> wrote:

> Yes, state in timestamp policy but simple, not like ParDo state.
> TimestampPolicy appears to be long lived instance. Take an inspiration from
> already existing policies, e.g. like the one here:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L135
>

Correct. Looks like we need to improve JavaDoc[1] for TimestampPolicy to
explain the contract better. The policy has same life cycle as the the
reader, with one instance for each of the Kafka partition read by the
reader. It is created when the reader is created. When the reader is
resumed from a checkpoint, the new policy's constructor has access to the
previous watermark set, but not the previous record timestamp. In that
sense, Jozef's solution has one corner case about unknown timestamp for the
first record if it is not parsable (which probably fine in practice). I had
left some comments about allowing user to remember more state for each
PartitionContext [2] when we added TimestampPolicy to KafkaIO.

State API is not available in UnboundedSource API, essentially
'CheckpointMark' is the state that can be stored with each reader.

[1]:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java#L29
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicy.java#L35


> On Mon, Oct 29, 2018 at 11:57 AM Kaymak, Tobias <[email protected]>
> wrote:
>
>> Sorry for not replying, I was sick with a flu.
>>
>> On Thu, Oct 25, 2018 at 9:56 AM Jozef Vilcek <[email protected]>
>> wrote:
>>
>>> what I ended up doing, when I could not for any reasono rely on kafka
>>> timestamps, but need to parse them form message is:
>>>
>>> * have a cusom kafka deserializer which never throws but returns message
>>> which is either a success with parsed data structure plus timestamp or
>>> failure with original kafka bytes payload
>>> * timestamp policy than can extract timestamp in case of a success
>>> deserialize result and in case of failure result, I am returning timestamp
>>> of a last success message  ( in my case messages are not terribly out of
>>> order and failures are rather rare )
>>>
>>
>> It's possible that my stream contains data that is very old (when
>> rewinding a topic, lets say it goes back to 2012). If I get the logic here
>> correctly this means I need to remember the last successfully parsed
>> timestamp. Do you solve this via StatefulProcessing?
>>
>>
>>> * following ParDo then side output failures to dead letters
>>>
>>>
>>
>>
>>> On Thu, Oct 25, 2018 at 8:54 AM Reuven Lax <[email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Oct 24, 2018, 10:26 PM Raghu Angadi <[email protected]> wrote:
>>>>
>>>>> Well, if every input record's timestamp is X, watermark staying at X
>>>>> is the right answer, no? But I am not sure where the disagreement is,
>>>>> actually. I might be mistaken.
>>>>>
>>>>> KafkaIO has a few in-built policies for watermark and timestamp that
>>>>> cover most use cases (including server time, which has a benefit of
>>>>> providing perfect watermark). It also gives fairly complete control on
>>>>> these to users if they chose to. I think it looks like reasonable for a
>>>>> policy to base its watermark only only on parsable records, and ignore
>>>>> unparsable records w.r.t watermark calculation.
>>>>>
>>>>
>>>> But then doesn't that force the user to set max allowed lateness to
>>>> infinity, otherwise these records will be dropped?
>>>>
>>>> It could even assign a timestamp that makes more logical sense in a
>>>>> particular application.
>>>>>
>>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Forgive me if this is naive or missing something, but here are my
>>>>>> thoughts on these alternatives:
>>>>>>
>>>>>> (0) Timestamp has to be pulled out in the source to control the
>>>>>> watermark. Luke's point is imortant.
>>>>>>
>>>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>>>> enough, then watermark will advance and they will all be dropped. That 
>>>>>> will
>>>>>> not allow output to a dead-letter queue.
>>>>>>
>>>>>> (2) If you have always min_timestamp records, or if bad records are
>>>>>> frequent, the watermark will never advance. So windows/aggregations would
>>>>>> never be considered complete. Triggers could be used to get output 
>>>>>> anyhow,
>>>>>> but it would never be a final answer. I think it is not in the spirit of
>>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>>>> runner.
>>>>>>
>>>>>> In SQL there is an actual "dead letter" option when creating a table
>>>>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>>>>> parsed to the expected schema - like maybe an avro record got in the
>>>>>> stream, or the JSON doesn't match the expected schema - it is output 
>>>>>> as-is
>>>>>> to a user-specified dead letter queue. I think this same level of support
>>>>>> is also required for records that cannot have timestamps extracted in an
>>>>>> unbounded source.
>>>>>>
>>>>>> In an SDF I think the function has enough control to do it all in
>>>>>> "userland", so Cham is right on here.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> That depends on the users pipeline and how watermark advancement of
>>>>>>> the source may impact elements becoming droppably late if they are 
>>>>>>> emitted
>>>>>>> with the minimum timestamp.
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I see.
>>>>>>>>
>>>>>>>> What I meant was to return min_timestamp for bad records in the
>>>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>>>>> parsable records. That should work too, right?
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, that would be fine.
>>>>>>>>>
>>>>>>>>> The user could then use a ParDo which outputs to a DLQ for things
>>>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for
>>>>>>>>> everything else.
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>>>> application fine is with what it means)?
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> All records in Apache Beam have a timestamp. The default
>>>>>>>>>>> timestamp is the min timestamp defined here:
>>>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> You would have to return min timestamp for all records
>>>>>>>>>>>>> otherwise the watermark may have advanced and you would be 
>>>>>>>>>>>>> outputting
>>>>>>>>>>>>> records that are droppably late.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a
>>>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark 
>>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>>>>> applying a function that would result in an exception,  this 
>>>>>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources
>>>>>>>>>>>>>>>>> should, unless there is a good reason not to.
>>>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform
>>>>>>>>>>>>>>>>> that makes to simpler to handle deadletter output? I think 
>>>>>>>>>>>>>>>>> there was a
>>>>>>>>>>>>>>>>> thread about it in the past.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) 
>>>>>>>>>>>>>>>>>>> and your
>>>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with 
>>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24.
>>>>>>>>>>>>>>>>>>> Okt. 2018, 05:18:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in 
>>>>>>>>>>>>>>>>>>>> my pipeline.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it
>>>>>>>>>>>>>>>>>>>> could be convenient for users in many such scenarios. This 
>>>>>>>>>>>>>>>>>>>> is simpler than
>>>>>>>>>>>>>>>>>>>> each source supporting it explicitly.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this
>>>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. 
>>>>>>>>>>>>>>>>>>>>> We might be able to
>>>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to
>>>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how
>>>>>>>>>>>>>>>>>>>>>>> would I be able to produce a
>>>>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads
>>>>>>>>>>>>>>>>>>>>>>> from a KafkaIO input:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my
>>>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>
>> --
>> Tobias Kaymak
>> Data Engineer
>>
>> [email protected]
>> www.ricardo.ch
>>
>

Reply via email to