All records in Apache Beam have a timestamp. The default timestamp is the
min timestamp defined here:
https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48


On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> wrote:

>
>
> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> wrote:
>
>> You would have to return min timestamp for all records otherwise the
>> watermark may have advanced and you would be outputting records that are
>> droppably late.
>>
>
> That would be fine I guess. What’s the timestamp for a record that doesn’t
> have one?
>
>
>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> wrote:
>>
>>> To be clear, returning min_timestamp for unparsable records shound not
>>> affect the watermark.
>>>
>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]>
>>> wrote:
>>>
>>>> How about returning min_timestamp? The would be dropped or redirected
>>>> by the ParDo after that.
>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>>> this pipeline defined under kafkaio package?
>>>>
>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> In this case, the user is attempting to handle errors when parsing the
>>>>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>>>>> how would they control the watermark in a downstream ParDo?
>>>>>
>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>>>>> function that would result in an exception,  this can be extracted by a
>>>>>>> ParDo down the line.
>>>>>>>
>>>>>>
>>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>>> there is a good reason not to.
>>>>>> Given that, do we think Beam should provide a tranform that makes to
>>>>>> simpler to handle deadletter output? I think there was a thread about it 
>>>>>> in
>>>>>> the past.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> As Raghu said,
>>>>>>>>
>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>>>>>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>
>>>>>>>>
>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018,
>>>>>>>> 05:18:
>>>>>>>>
>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>>>>> record
>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>
>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>> convenient for users in many such scenarios. This is simpler than each
>>>>>>>>> source supporting it explicitly.
>>>>>>>>>
>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>> probably not something that can easily be supported. We might be 
>>>>>>>>>> able to
>>>>>>>>>> support similar features when we have Kafka on top of Splittable DoFn
>>>>>>>>>> though.
>>>>>>>>>>
>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Cham
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is a great question. I've added the dev list to be sure it
>>>>>>>>>>> gets noticed by whoever may know best.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that
>>>>>>>>>>>> uses a KafkaIO
>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be
>>>>>>>>>>>> able to produce a
>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>
>>>>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>>>>>>> input:
>>>>>>>>>>>>
>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>
>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>
>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>>>> "earliest"))
>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>
>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>>>>> "true"))
>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Tobi
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to