Yes, that would be fine.

The user could then use a ParDo which outputs to a DLQ for things it can't
parse the timestamp for and use outputWithTimestamp[1] for everything else.

1:
https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-

On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> wrote:

> Thanks. So returning  min timestamp is OK, right (assuming application
> fine is with what it means)?
>
> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote:
>
>> All records in Apache Beam have a timestamp. The default timestamp is the
>> min timestamp defined here:
>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>
>>
>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> wrote:
>>
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> You would have to return min timestamp for all records otherwise the
>>>> watermark may have advanced and you would be outputting records that are
>>>> droppably late.
>>>>
>>>
>>> That would be fine I guess. What’s the timestamp for a record that
>>> doesn’t have one?
>>>
>>>
>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]>
>>>> wrote:
>>>>
>>>>> To be clear, returning min_timestamp for unparsable records shound not
>>>>> affect the watermark.
>>>>>
>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> How about returning min_timestamp? The would be dropped or redirected
>>>>>> by the ParDo after that.
>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>>>>>> is this pipeline defined under kafkaio package?
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> In this case, the user is attempting to handle errors when parsing
>>>>>>> the timestamp. The timestamp controls the watermark for the
>>>>>>> UnboundedSource, how would they control the watermark in a downstream 
>>>>>>> ParDo?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>>>>>>> function that would result in an exception,  this can be extracted by 
>>>>>>>>> a
>>>>>>>>> ParDo down the line.
>>>>>>>>>
>>>>>>>>
>>>>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>>>>> there is a good reason not to.
>>>>>>>> Given that, do we think Beam should provide a tranform that makes
>>>>>>>> to simpler to handle deadletter output? I think there was a thread 
>>>>>>>> about it
>>>>>>>> in the past.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> As Raghu said,
>>>>>>>>>>
>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>>> that you can extract your Success Records (TupleTag) and your 
>>>>>>>>>> DeadLetter
>>>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018,
>>>>>>>>>> 05:18:
>>>>>>>>>>
>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to 
>>>>>>>>>>> handle record
>>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>>
>>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>>> convenient for users in many such scenarios. This is simpler than 
>>>>>>>>>>> each
>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>>> probably not something that can easily be supported. We might be 
>>>>>>>>>>>> able to
>>>>>>>>>>>> support similar features when we have Kafka on top of Splittable 
>>>>>>>>>>>> DoFn
>>>>>>>>>>>> though.
>>>>>>>>>>>>
>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Cham
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> This is a great question. I've added the dev list to be sure
>>>>>>>>>>>>> it gets noticed by whoever may know best.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline
>>>>>>>>>>>>>> that uses a KafkaIO
>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be
>>>>>>>>>>>>>> able to produce a
>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>>>>>> "earliest"))
>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>>>>>>> "true"))
>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to