On Thu, Oct 25, 2018 at 10:41 AM Raghu Angadi <[email protected]> wrote:

>
> On Thu, Oct 25, 2018 at 10:28 AM Chamikara Jayalath <[email protected]>
> wrote:
>
>> Not sure if I understand why this would require Kafka to behave as two
>> independent sources.
>>
>
>
>
>> Won't setting a non-negative-infinity timestamp (for example processing
>> time) for failed records be enough ?
>>
>
> Well that's what I suggested and that is what a user seems to have done.
> The question what if that is not what we want and don't want to mix these
> two together (at least from my reading of Luke's and Kenn's comments, which
> could be off).
>

Sorry I meant setting it at the SDF itself (not from a ParDo) when we have
SDF-based KafkaIO.


>
>

>
>>
>> Also (at least at some point) there were discussions on supporting SDFs
>> to report different watermarks for different outputs. More details are
>> available here:
>> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
>>
>> - Cham
>>
>>
>>>
>>> Raghu.
>>>
>>>>
>>>> It could even assign a timestamp that makes more logical sense in a
>>>>> particular application.
>>>>>
>>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Forgive me if this is naive or missing something, but here are my
>>>>>> thoughts on these alternatives:
>>>>>>
>>>>>> (0) Timestamp has to be pulled out in the source to control the
>>>>>> watermark. Luke's point is imortant.
>>>>>>
>>>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>>>> enough, then watermark will advance and they will all be dropped. That 
>>>>>> will
>>>>>> not allow output to a dead-letter queue.
>>>>>>
>>>>>> (2) If you have always min_timestamp records, or if bad records are
>>>>>> frequent, the watermark will never advance. So windows/aggregations would
>>>>>> never be considered complete. Triggers could be used to get output 
>>>>>> anyhow,
>>>>>> but it would never be a final answer. I think it is not in the spirit of
>>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>>>> runner.
>>>>>>
>>>>>> In SQL there is an actual "dead letter" option when creating a table
>>>>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>>>>> parsed to the expected schema - like maybe an avro record got in the
>>>>>> stream, or the JSON doesn't match the expected schema - it is output 
>>>>>> as-is
>>>>>> to a user-specified dead letter queue. I think this same level of support
>>>>>> is also required for records that cannot have timestamps extracted in an
>>>>>> unbounded source.
>>>>>>
>>>>>> In an SDF I think the function has enough control to do it all in
>>>>>> "userland", so Cham is right on here.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> That depends on the users pipeline and how watermark advancement of
>>>>>>> the source may impact elements becoming droppably late if they are 
>>>>>>> emitted
>>>>>>> with the minimum timestamp.
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I see.
>>>>>>>>
>>>>>>>> What I meant was to return min_timestamp for bad records in the
>>>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>>>>> parsable records. That should work too, right?
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, that would be fine.
>>>>>>>>>
>>>>>>>>> The user could then use a ParDo which outputs to a DLQ for things
>>>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for
>>>>>>>>> everything else.
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>>>> application fine is with what it means)?
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> All records in Apache Beam have a timestamp. The default
>>>>>>>>>>> timestamp is the min timestamp defined here:
>>>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> You would have to return min timestamp for all records
>>>>>>>>>>>>> otherwise the watermark may have advanced and you would be 
>>>>>>>>>>>>> outputting
>>>>>>>>>>>>> records that are droppably late.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a
>>>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark 
>>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>>>>> applying a function that would result in an exception,  this 
>>>>>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources
>>>>>>>>>>>>>>>>> should, unless there is a good reason not to.
>>>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform
>>>>>>>>>>>>>>>>> that makes to simpler to handle deadletter output? I think 
>>>>>>>>>>>>>>>>> there was a
>>>>>>>>>>>>>>>>> thread about it in the past.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) 
>>>>>>>>>>>>>>>>>>> and your
>>>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with 
>>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24.
>>>>>>>>>>>>>>>>>>> Okt. 2018, 05:18:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in 
>>>>>>>>>>>>>>>>>>>> my pipeline.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it
>>>>>>>>>>>>>>>>>>>> could be convenient for users in many such scenarios. This 
>>>>>>>>>>>>>>>>>>>> is simpler than
>>>>>>>>>>>>>>>>>>>> each source supporting it explicitly.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this
>>>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. 
>>>>>>>>>>>>>>>>>>>>> We might be able to
>>>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to
>>>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how
>>>>>>>>>>>>>>>>>>>>>>> would I be able to produce a
>>>>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads
>>>>>>>>>>>>>>>>>>>>>>> from a KafkaIO input:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my
>>>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to