Thanks. So returning  min timestamp is OK, right (assuming application fine
is with what it means)?

On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote:

> All records in Apache Beam have a timestamp. The default timestamp is the
> min timestamp defined here:
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>
>
> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> wrote:
>
>>
>>
>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> You would have to return min timestamp for all records otherwise the
>>> watermark may have advanced and you would be outputting records that are
>>> droppably late.
>>>
>>
>> That would be fine I guess. What’s the timestamp for a record that
>> doesn’t have one?
>>
>>
>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]>
>>> wrote:
>>>
>>>> To be clear, returning min_timestamp for unparsable records shound not
>>>> affect the watermark.
>>>>
>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]>
>>>> wrote:
>>>>
>>>>> How about returning min_timestamp? The would be dropped or redirected
>>>>> by the ParDo after that.
>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>>>> this pipeline defined under kafkaio package?
>>>>>
>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> In this case, the user is attempting to handle errors when parsing
>>>>>> the timestamp. The timestamp controls the watermark for the
>>>>>> UnboundedSource, how would they control the watermark in a downstream 
>>>>>> ParDo?
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>>>>>> function that would result in an exception,  this can be extracted by a
>>>>>>>> ParDo down the line.
>>>>>>>>
>>>>>>>
>>>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>>>> there is a good reason not to.
>>>>>>> Given that, do we think Beam should provide a tranform that makes to
>>>>>>> simpler to handle deadletter output? I think there was a thread about 
>>>>>>> it in
>>>>>>> the past.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> As Raghu said,
>>>>>>>>>
>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>> that you can extract your Success Records (TupleTag) and your 
>>>>>>>>> DeadLetter
>>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018,
>>>>>>>>> 05:18:
>>>>>>>>>
>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>>>>>> record
>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>
>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>> convenient for users in many such scenarios. This is simpler than 
>>>>>>>>>> each
>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>> probably not something that can easily be supported. We might be 
>>>>>>>>>>> able to
>>>>>>>>>>> support similar features when we have Kafka on top of Splittable 
>>>>>>>>>>> DoFn
>>>>>>>>>>> though.
>>>>>>>>>>>
>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Cham
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This is a great question. I've added the dev list to be sure it
>>>>>>>>>>>> gets noticed by whoever may know best.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that
>>>>>>>>>>>>> uses a KafkaIO
>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be
>>>>>>>>>>>>> able to produce a
>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>
>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>
>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>
>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>>>>> "earliest"))
>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>
>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>>>>>> "true"))
>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>             new
>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to