On Fri, Nov 9, 2018 at 1:23 AM Raghu Angadi <rang...@google.com> wrote:

>
>>> That is fine. We can ignore the timestamp as possible suspect for
>>> debugging this. Using custom timestamps from records is normal.
>>>
>>>
>> Could you clarify of what you meant with "withTimestampFn() was never
>> meant to
>> be public"? I am using it to attach the right timestamp to an element to
>> be
>> able to window into days with the local time zone in the windowing
>> function. If
>> this is not used in the correct way could you tell me how I can do it
>> better?
>>
>
> The problem with watermark part of the policy. A source needs to provide
> both a timestamp for a record as well as a watermark for the stream. A
> TimestampPolicy in KafkaIO is responsible for both of these for each Kafka
> partition.
>
> `withTimestampFn()` lets user provide a function to extract timestamp. But
> for watermark, it just uses most recent record's timestamp. Say record A
> has timestamp 9:00:01 and arrives at 9:00:05, and B has a timestamp of
> 8:59:58 and arrives at 9:00:15.
> That implies once is A is processed at 9:00:05, your pipelines watermark
> could jump to 9:00:01, that implies a hourly window for [8:00, 9:00) will
> close. When B arrives 10 seconds later, it would be considered late. The
> problem is not just that such watermark policy is too brittle, it is the
> fact that users have no idea that is happening.
>
> Deprecation documentation for this API[1] suggests using
> `CustomTimestampPolicyWithLimitedDelay()` [2] in stead.
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L100
> [2]:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L29
>
>
Thank you so much, that explanation was very helpful! I will adapt to the
new approach and continue digging into the missing partition's problem
afterwards.
Tobi.



> Raghu.
>
> After the rollback I am busy making the existing pipeline to GCS so robust
>> that
>> it never fails to deliver all files so that there is always a backup. As
>> I am
>> under a lot of pressure right now I don't want to fuck it up with
>> easy-to-avoid
>> mistakes and the GCS pipeline has the same logic, but just a different
>> sink
>> that uses a FileIO to write out different days to different "folders".
>>
>> Thank you,
>>
>> Tobi
>>
>>
>>
>>> Raghu.
>>>
>>>
>>>> I could also not fiddle with the timestamp at all and let the system
>>>> decide and
>>>> then in the BigQuery.IO partitioning step parse it and assign it to a
>>>> partition. Is this better?
>>>>
>>>>
>>>>
>>>>> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am sharing my experience with you after trying to use the following
>>>>>> pipeline
>>>>>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>>>>>
>>>>>> 1. Reading from KafkaIO, attaching a timestamp from each parsed
>>>>>> element
>>>>>> 2. Filtering bad records
>>>>>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch
>>>>>> jobs)
>>>>>> every 15 minutes
>>>>>>
>>>>>> I had a working pipeline that does not write to BigQuery directly,
>>>>>> but to
>>>>>> Cloud Storage, so it's 3rd step was
>>>>>>
>>>>>> 3. Writing files to GCS in daily "subdirectories"
>>>>>>
>>>>>> I tried to rewrite the pipeline to reduce complexity: Resetting it's
>>>>>> state
>>>>>> should no longer be tied to thinking about what to delete on GCS, also
>>>>>> configurable refresh times directly from within the Pipeline was
>>>>>> something I
>>>>>> was looking for. The thing that I needed to change was the output in
>>>>>> the end,
>>>>>> so knew my parsing logic would not change and that should reduce the
>>>>>> risk.
>>>>>>
>>>>>> I tested the pipeline within our testcluster and it looked promising.
>>>>>> When I
>>>>>> deployed it last week everything seemed to go smoothly. On Friday I
>>>>>> noticed
>>>>>> that I had holes in the data: in the BigQuery tables there were
>>>>>> missing days
>>>>>> (tricky was that the recent days looked fine). (To be sure I reset
>>>>>> the pipeline
>>>>>> and read from the very beginning of each topic from Kafka. Within
>>>>>> different
>>>>>> runs, different days were missing.) I spent the weekend rolling back
>>>>>> the
>>>>>> changes and trying to figure out what was going on.
>>>>>>
>>>>>> I didn't see any error in the logs (the log level was on WARNING for
>>>>>> most
>>>>>> parts), but I thought, well maybe it's because there are too many
>>>>>> partitions
>>>>>> and BigQuery has a limit of 1000 partition operations per day. So I
>>>>>> started
>>>>>> reading from just 90 days in the past, but I still had holes (whole
>>>>>> days).
>>>>>>
>>>>>> I had a windowing step that I needed for the GCS pipeline, I became
>>>>>> aware that I
>>>>>> wouldn't need this anymore with BigQueryIO so I commented it out and
>>>>>> tested
>>>>>> again, without luck.
>>>>>>
>>>>>> What struck me was that the Flink Cluster didn't do any checkpoints
>>>>>> for the
>>>>>> pipeline that was using BigQueryIO - it does so when writing to GCS
>>>>>> and I
>>>>>> tested it's failure logic there. Additionally the graph in Flink with
>>>>>> BigQueryIO becomes very complex, but this is something I expected.
>>>>>>
>>>>>> Here is the Pipeline code with the commented out windowing part:
>>>>>>
>>>>>>   pipeline
>>>>>>         .apply(
>>>>>>             KafkaIO.<String, String>read()
>>>>>>                 .withBootstrapServers(bootstrap)
>>>>>>                 .withTopics(topics)
>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>                 .updateConsumerProperties(
>>>>>>
>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>> inputMessagesConfig))
>>>>>>
>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>> "earliest"))
>>>>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>> "di-beam-consumers"))
>>>>>>
>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>>>>                 .withTimestampPolicyFactory(
>>>>>>                     TimestampPolicyFactory.withTimestampFn(
>>>>>>                         new
>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>                 .withReadCommitted()
>>>>>>                 .commitOffsetsInFinalize())
>>>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>>>         //        .apply(
>>>>>>         //            Window.into(new
>>>>>> ZurichTimePartitioningWindowFn())
>>>>>>         //                .triggering(
>>>>>>         //                    Repeatedly.forever(
>>>>>>         //                        AfterFirst.of(
>>>>>>         //
>>>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>>>         //
>>>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>>>         //
>>>>>> .plusDelayOf(refreshFrequency))))
>>>>>>         //
>>>>>> .withAllowedLateness(Duration.standardDays(1))
>>>>>>         //                .discardingFiredPanes())
>>>>>>         .apply(
>>>>>>             BigQueryIO.<Event>write()
>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>>>                 .withNumFileShards(1)
>>>>>>                 .to(partitionedTableDynamicDestinations)
>>>>>>                 .withFormatFunction(
>>>>>>                     (SerializableFunction<Event, TableRow>)
>>>>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>>>>
>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>
>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>
>>>>>>
>>>>>> I have the feeling that I must make some serious and dumb mistakes as
>>>>>> I know
>>>>>> the Beam framework is very robust. Thanks for taking the time to read
>>>>>> this.
>>>>>>
>>>>>> Tobi
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Tobias Kaymak
>>>> Data Engineer
>>>>
>>>> tobias.kay...@ricardo.ch
>>>> www.ricardo.ch
>>>>
>>>
>>

Reply via email to