Hi, Jan

Here's how we do ReadFromKafka, the expansion service is just to ensure we
can work with xlang in k8s, so please ignore them.
from apache_beam.io.kafka import default_io_expansion_service
ReadFromKafka(
consumer_config={
"group.id": "group-name",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
},
topics="topic-name",
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
f"--defaultEnvironmentType=PROCESS",
f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
"--experiments=use_deprecated_read",
]
),
commit_offset_in_finalize=True,
)


Do you know what would be the right approach for using processing time
instead? I thought the WindowInto supposed to use the timestamp we appened
to the event?  Do you think it is still using the original Kafka event
timestamp?  Thanks!



On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> Can you share the (relevant) parameters of the ReadFromKafka transform?
>
> This feels strange, and it might not do what you'd expect:
> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event:
> window.TimestampedValue(event, time.time()))
>
> This does not change the assigned timestamp of an element, but creates a
> new element which contains processing time. It will not be used for
> windowing, though.
>
> On 9/19/24 00:49, Lydian Lee wrote:
>
> Hi Reuven,
>
> Here's a quick look for our pipeline:
> (
> pipeline
> | "Reading message from Kafka" >> ReadFromKafka(...)
> | "Deserializing events" >> Deserialize(**deserializer_args)
> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event:
> window.TimestampedValue(event, time.time()))
> | "Window into Fixed Intervals" >> beam.WindowInto(
> beam.transforms.window.FixedWindows(fixed_window_size), #
> fixed_window_size is 1 min.
> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), #
> although we configured lateness, but because we are using processing time,
> i don't expect any late events
> )
> | "Adding random integer partition key" >> beam.Map(
> lambda event: (random.randint(1, 5), element) # add dummy key to
> reshuffle to less partitions.  Kafka have 16 partition, but we only want to
> generate 2 files every minute
> )
> | "Group by randomly-assigned integer key" >> beam.GroupByKey()
> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
> | "Writing event data batches to parquet" >> WriteBatchesToS3(...)  #
> call boto3 to write the events into S3 with parquet format
> )
>
> Thanks!
>
>
> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <user@beam.apache.org>
> wrote:
>
>> How are you doing this aggregation?
>>
>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com>
>> wrote:
>>
>>> Hi Jan,
>>>
>>> Thanks for the recommendation. In our case, we are windowing with the
>>> processing time, which means that there should be no late event at all.
>>>
>>> You’ve mentioned that GroupByKey is stateful and can potentially drop
>>> the data. Given that after reshuffle (add random shuffle id to the key), we
>>> then do the aggregation (combine the data and write those data to S3.) Do
>>> you think the example I mentioned earlier could potentially be the reason
>>> for the dropping data?
>>>
>>> If so, in general how does Beam being able to prevent that ? Are there
>>> any suggested approaches? Thanks
>>>
>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Lydian,
>>>>
>>>> in that case, there is only a generic advice you can look into.
>>>> Reshuffle is a stateless operation that should not cause dropping data. A
>>>> GroupByKey on the other hand is stateful and thus can - when dealing with
>>>> late data - drop some of them. You should be able to confirm this looking
>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens
>>>> when elements arrive after watermark passes element's timestamp minus
>>>> allowed lateness. If you see the log, you might need to either change how
>>>> you assign timestamps to elements (e.g. use log append time) or increase
>>>> allowed lateness of your windowfn.
>>>>
>>>> Best,
>>>>
>>>>  Jan
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>>>> On 9/18/24 08:53, Lydian Lee wrote:
>>>>
>>>> I would love to, but there are some limitations on our ends that the
>>>> version bump won’t be happened soon. Thus I need to figure out what might
>>>> be the root cause though.
>>>>
>>>>
>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Lydian,
>>>>>
>>>>> 2.41.0 is quite old, can you please try current version to see if this
>>>>> issue is still present? There were lots of changes between 2.41.0 and
>>>>> 2.59.0.
>>>>>
>>>>>  Jan
>>>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Beam Python SDK with Flink Runner, the Beam version is
>>>>> 2.41.0 and the Flink version is 1.15.4.
>>>>>
>>>>> We have a pipeline that has 2 stages:
>>>>> 1. read from kafka and fixed window for every 1 minute
>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that we
>>>>> have less partition count and write them into s3.
>>>>>
>>>>> We disabled the enable.auto.commit and enabled
>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest"
>>>>> [image: image.png]
>>>>>
>>>>> According to the log, I can definitely find the data is consuming from
>>>>> Kafka Offset, Because there are many
>>>>> ```
>>>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>>>> ```
>>>>> and that partition/offset pair does match the missing records.
>>>>> However, it doesn't show up in the final S3.
>>>>>
>>>>> My current hypothesis is that the shuffling might be the reason for
>>>>> the issue, for example, originally in kafka for the past minute in
>>>>> partition 1,  I have offset 1, 2, 3 records. After reshuffle, it now
>>>>> distribute, for example:
>>>>> - partition A: 1, 3
>>>>> - partition B: 2
>>>>>
>>>>> And if partition A is done successfully but partition B fails. Given
>>>>> that A is succeeded, it will commit its offset to Kafka, and thus kafka 
>>>>> now
>>>>> has an offset to 3.  And when kafka retries , it will skip the offset 2.
>>>>>  However, I am not sure how exactly the offset commit works, wondering how
>>>>> it interacts with the checkpoints.  But it does seem like if my hypothesis
>>>>> is correct, we should be seeing more missing records, however, this seems
>>>>> rare to happen.  Wondering if anyone can help identify potential
>>>>> root causes?  Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to