Hi Reuven, Here's a quick look for our pipeline: ( pipeline | "Reading message from Kafka" >> ReadFromKafka(...) | "Deserializing events" >> Deserialize(**deserializer_args) | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: window.TimestampedValue(event, time.time())) | "Window into Fixed Intervals" >> beam.WindowInto( beam.transforms.window.FixedWindows(fixed_window_size), # fixed_window_size is 1 min. allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # although we configured lateness, but because we are using processing time, i don't expect any late events ) | "Adding random integer partition key" >> beam.Map( lambda event: (random.randint(1, 5), element) # add dummy key to reshuffle to less partitions. Kafka have 16 partition, but we only want to generate 2 files every minute ) | "Group by randomly-assigned integer key" >> beam.GroupByKey() | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) | "Writing event data batches to parquet" >> WriteBatchesToS3(...) # call boto3 to write the events into S3 with parquet format )
Thanks! On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <user@beam.apache.org> wrote: > How are you doing this aggregation? > > On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com> > wrote: > >> Hi Jan, >> >> Thanks for the recommendation. In our case, we are windowing with the >> processing time, which means that there should be no late event at all. >> >> You’ve mentioned that GroupByKey is stateful and can potentially drop the >> data. Given that after reshuffle (add random shuffle id to the key), we >> then do the aggregation (combine the data and write those data to S3.) Do >> you think the example I mentioned earlier could potentially be the reason >> for the dropping data? >> >> If so, in general how does Beam being able to prevent that ? Are there >> any suggested approaches? Thanks >> >> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Lydian, >>> >>> in that case, there is only a generic advice you can look into. >>> Reshuffle is a stateless operation that should not cause dropping data. A >>> GroupByKey on the other hand is stateful and thus can - when dealing with >>> late data - drop some of them. You should be able to confirm this looking >>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens >>> when elements arrive after watermark passes element's timestamp minus >>> allowed lateness. If you see the log, you might need to either change how >>> you assign timestamps to elements (e.g. use log append time) or increase >>> allowed lateness of your windowfn. >>> >>> Best, >>> >>> Jan >>> >>> [1] >>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >>> On 9/18/24 08:53, Lydian Lee wrote: >>> >>> I would love to, but there are some limitations on our ends that the >>> version bump won’t be happened soon. Thus I need to figure out what might >>> be the root cause though. >>> >>> >>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Lydian, >>>> >>>> 2.41.0 is quite old, can you please try current version to see if this >>>> issue is still present? There were lots of changes between 2.41.0 and >>>> 2.59.0. >>>> >>>> Jan >>>> On 9/17/24 17:49, Lydian Lee wrote: >>>> >>>> Hi, >>>> >>>> We are using Beam Python SDK with Flink Runner, the Beam version is >>>> 2.41.0 and the Flink version is 1.15.4. >>>> >>>> We have a pipeline that has 2 stages: >>>> 1. read from kafka and fixed window for every 1 minute >>>> 2. aggregate the data for the past 1 minute and reshuffle so that we >>>> have less partition count and write them into s3. >>>> >>>> We disabled the enable.auto.commit and enabled >>>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest" >>>> [image: image.png] >>>> >>>> According to the log, I can definitely find the data is consuming from >>>> Kafka Offset, Because there are many >>>> ``` >>>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>>> ``` >>>> and that partition/offset pair does match the missing records. >>>> However, it doesn't show up in the final S3. >>>> >>>> My current hypothesis is that the shuffling might be the reason for the >>>> issue, for example, originally in kafka for the past minute in partition >>>> 1, I have offset 1, 2, 3 records. After reshuffle, it now distribute, for >>>> example: >>>> - partition A: 1, 3 >>>> - partition B: 2 >>>> >>>> And if partition A is done successfully but partition B fails. Given >>>> that A is succeeded, it will commit its offset to Kafka, and thus kafka now >>>> has an offset to 3. And when kafka retries , it will skip the offset 2. >>>> However, I am not sure how exactly the offset commit works, wondering how >>>> it interacts with the checkpoints. But it does seem like if my hypothesis >>>> is correct, we should be seeing more missing records, however, this seems >>>> rare to happen. Wondering if anyone can help identify potential >>>> root causes? Thanks >>>> >>>> >>>> >>>> >>>>