Well, if every input record's timestamp is X, watermark staying at X is the
right answer, no? But I am not sure where the disagreement is, actually. I
might be mistaken.

KafkaIO has a few in-built policies for watermark and timestamp that cover
most use cases (including server time, which has a benefit of providing
perfect watermark). It also gives fairly complete control on these to users
if they chose to. I think it looks like reasonable for a policy to base its
watermark only only on parsable records, and ignore unparsable records
w.r.t watermark calculation. It could even assign a timestamp that makes
more logical sense in a particular application.

On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> wrote:

> Forgive me if this is naive or missing something, but here are my thoughts
> on these alternatives:
>
> (0) Timestamp has to be pulled out in the source to control the watermark.
> Luke's point is imortant.
>
> (1) If bad records get min_timestamp, and they occur infrequently enough,
> then watermark will advance and they will all be dropped. That will not
> allow output to a dead-letter queue.
>
> (2) If you have always min_timestamp records, or if bad records are
> frequent, the watermark will never advance. So windows/aggregations would
> never be considered complete. Triggers could be used to get output anyhow,
> but it would never be a final answer. I think it is not in the spirit of
> Beam to work this way. Pragmatically, no state could ever be freed by a
> runner.
>
> In SQL there is an actual "dead letter" option when creating a table that
> parses from a bytes source. If, for example, a JSON record cannot be parsed
> to the expected schema - like maybe an avro record got in the stream, or
> the JSON doesn't match the expected schema - it is output as-is to a
> user-specified dead letter queue. I think this same level of support is
> also required for records that cannot have timestamps extracted in an
> unbounded source.
>
> In an SDF I think the function has enough control to do it all in
> "userland", so Cham is right on here.
>
> Kenn
>
> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote:
>
>> That depends on the users pipeline and how watermark advancement of the
>> source may impact elements becoming droppably late if they are emitted with
>> the minimum timestamp.
>>
>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> wrote:
>>
>>> I see.
>>>
>>> What I meant was to return min_timestamp for bad records in the
>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>> parsable records. That should work too, right?
>>>
>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Yes, that would be fine.
>>>>
>>>> The user could then use a ParDo which outputs to a DLQ for things it
>>>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>>>> else.
>>>>
>>>> 1:
>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>
>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks. So returning  min timestamp is OK, right (assuming application
>>>>> fine is with what it means)?
>>>>>
>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> All records in Apache Beam have a timestamp. The default timestamp is
>>>>>> the min timestamp defined here:
>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You would have to return min timestamp for all records otherwise
>>>>>>>> the watermark may have advanced and you would be outputting records 
>>>>>>>> that
>>>>>>>> are droppably late.
>>>>>>>>
>>>>>>>
>>>>>>> That would be fine I guess. What’s the timestamp for a record that
>>>>>>> doesn’t have one?
>>>>>>>
>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> To be clear, returning min_timestamp for unparsable records shound
>>>>>>>>> not affect the watermark.
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
>>>>>>>>>> API, is this pipeline defined under kafkaio package?
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for the
>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>> applying a function that would result in an exception,  this can 
>>>>>>>>>>>>> be
>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that
>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was a 
>>>>>>>>>>>> thread
>>>>>>>>>>>> about it in the past.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) and 
>>>>>>>>>>>>>> your
>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with them.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt.
>>>>>>>>>>>>>> 2018, 05:18:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to 
>>>>>>>>>>>>>>> handle record
>>>>>>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>>>>>>> convenient for users in many such scenarios. This is simpler 
>>>>>>>>>>>>>>> than each
>>>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>>>>>>> probably not something that can easily be supported. We might 
>>>>>>>>>>>>>>>> be able to
>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be
>>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline
>>>>>>>>>>>>>>>>>> that uses a KafkaIO
>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I
>>>>>>>>>>>>>>>>>> be able to produce a
>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>

Reply via email to