Forgive me if this is naive or missing something, but here are my thoughts
on these alternatives:

(0) Timestamp has to be pulled out in the source to control the watermark.
Luke's point is imortant.

(1) If bad records get min_timestamp, and they occur infrequently enough,
then watermark will advance and they will all be dropped. That will not
allow output to a dead-letter queue.

(2) If you have always min_timestamp records, or if bad records are
frequent, the watermark will never advance. So windows/aggregations would
never be considered complete. Triggers could be used to get output anyhow,
but it would never be a final answer. I think it is not in the spirit of
Beam to work this way. Pragmatically, no state could ever be freed by a
runner.

In SQL there is an actual "dead letter" option when creating a table that
parses from a bytes source. If, for example, a JSON record cannot be parsed
to the expected schema - like maybe an avro record got in the stream, or
the JSON doesn't match the expected schema - it is output as-is to a
user-specified dead letter queue. I think this same level of support is
also required for records that cannot have timestamps extracted in an
unbounded source.

In an SDF I think the function has enough control to do it all in
"userland", so Cham is right on here.

Kenn

On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote:

> That depends on the users pipeline and how watermark advancement of the
> source may impact elements becoming droppably late if they are emitted with
> the minimum timestamp.
>
> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> wrote:
>
>> I see.
>>
>> What I meant was to return min_timestamp for bad records in the timestamp
>> handler passed to KafkaIO itself, and correct timestamp for parsable
>> records. That should work too, right?
>>
>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> Yes, that would be fine.
>>>
>>> The user could then use a ParDo which outputs to a DLQ for things it
>>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>>> else.
>>>
>>> 1:
>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>
>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> wrote:
>>>
>>>> Thanks. So returning  min timestamp is OK, right (assuming application
>>>> fine is with what it means)?
>>>>
>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> All records in Apache Beam have a timestamp. The default timestamp is
>>>>> the min timestamp defined here:
>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>
>>>>>
>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> You would have to return min timestamp for all records otherwise the
>>>>>>> watermark may have advanced and you would be outputting records that are
>>>>>>> droppably late.
>>>>>>>
>>>>>>
>>>>>> That would be fine I guess. What’s the timestamp for a record that
>>>>>> doesn’t have one?
>>>>>>
>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> To be clear, returning min_timestamp for unparsable records shound
>>>>>>>> not affect the watermark.
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
>>>>>>>>> API, is this pipeline defined under kafkaio package?
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for the
>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>> downstream ParDo?
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>> applying a function that would result in an exception,  this can be
>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>>> Given that, do we think Beam should provide a tranform that
>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was a 
>>>>>>>>>>> thread
>>>>>>>>>>> about it in the past.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>>>>>> that you can extract your Success Records (TupleTag) and your 
>>>>>>>>>>>>> DeadLetter
>>>>>>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt.
>>>>>>>>>>>>> 2018, 05:18:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to 
>>>>>>>>>>>>>> handle record
>>>>>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>>>>>> convenient for users in many such scenarios. This is simpler 
>>>>>>>>>>>>>> than each
>>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>>>>>> probably not something that can easily be supported. We might 
>>>>>>>>>>>>>>> be able to
>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be
>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline
>>>>>>>>>>>>>>>>> that uses a KafkaIO
>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I
>>>>>>>>>>>>>>>>> be able to produce a
>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>

Reply via email to