Hi Lydian,
because you do not specify 'timestamp_policy' it should use the default,
which should be processing time, so this should not be the issue. The
one potentially left transform is the sink transform, as Reuven
mentioned. Can you share details of the implementation?
Jan
On 9/19/24 23:10, Lydian Lee wrote:
Hi, Jan
Here's how we do ReadFromKafka, the expansion service is just to
ensure we can work with xlang in k8s, so please ignore them.
from apache_beam.io.kafka import default_io_expansion_service
ReadFromKafka(
consumer_config={
"group.id <http://group.id>": "group-name",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
},
topics="topic-name",
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
f"--defaultEnvironmentType=PROCESS",
f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
"--experiments=use_deprecated_read",
]
),
commit_offset_in_finalize=True,
)
Do you know what would be the right approach for using processing time
instead? I thought the WindowInto supposed to use the timestamp we
appened to the event? Do you think it is still using the original
Kafka event timestamp? Thanks!
On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote:
Can you share the (relevant) parameters of the ReadFromKafka
transform?
This feels strange, and it might not do what you'd expect:
| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
event: window.TimestampedValue(event, time.time()))
This does not change the assigned timestamp of an element, but
creates a new element which contains processing time. It will not
be used for windowing, though.
On 9/19/24 00:49, Lydian Lee wrote:
Hi Reuven,
Here's a quick look for our pipeline:
(
pipeline
|"Reading message from Kafka">>ReadFromKafka(...)
| "Deserializing events" >> Deserialize(**deserializer_args)
| "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
event: window.TimestampedValue(event, time.time()))
| "Window into Fixed Intervals" >> beam.WindowInto(
beam.transforms.window.FixedWindows(fixed_window_size), #
fixed_window_size is 1 min.
allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness),
# although we configured lateness, but because we are using
processing time, i don't expect any late events
)
| "Adding random integer partition key" >> beam.Map(
lambda event: (random.randint(1, 5), element) # add dummy key to
reshuffle to less partitions. Kafka have 16 partition, but we
only want to generate 2 files every minute
)
| "Group by randomly-assigned integer key" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
| "Writing event data batches to parquet"
>> WriteBatchesToS3(...) # call boto3 to write the events into S3
with parquet format
)
Thanks!
On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user
<user@beam.apache.org> wrote:
How are you doing this aggregation?
On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee
<tingyenlee...@gmail.com> wrote:
Hi Jan,
Thanks for the recommendation. In our case, we are
windowing with the processing time, which means that
there should be no late event at all.
You’ve mentioned that GroupByKey is stateful and can
potentially drop the data. Given that after reshuffle
(add random shuffle id to the key), we then do the
aggregation (combine the data and write those data to
S3.) Do you think the example I mentioned earlier could
potentially be the reason for the dropping data?
If so, in general how does Beam being able to prevent
that ? Are there any suggested approaches? Thanks
On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Lydian,
in that case, there is only a generic advice you can
look into. Reshuffle is a stateless operation that
should not cause dropping data. A GroupByKey on the
other hand is stateful and thus can - when dealing
with late data - drop some of them. You should be
able to confirm this looking for
'droppedDueToLateness' counter and/or log in here
[1]. This happens when elements arrive after
watermark passes element's timestamp minus allowed
lateness. If you see the log, you might need to
either change how you assign timestamps to elements
(e.g. use log append time) or increase allowed
lateness of your windowfn.
Best,
Jan
[1]
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
On 9/18/24 08:53, Lydian Lee wrote:
I would love to, but there are some limitations on
our ends that the version bump won’t be happened
soon. Thus I need to figure out what might be the
root cause though.
On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Lydian,
2.41.0 is quite old, can you please try current
version to see if this issue is still present?
There were lots of changes between 2.41.0 and
2.59.0.
Jan
On 9/17/24 17:49, Lydian Lee wrote:
Hi,
We are using Beam Python SDK with Flink Runner,
the Beam version is 2.41.0 and the Flink
version is 1.15.4.
We have a pipeline that has 2 stages:
1. read from kafka and fixed window for every 1
minute
2. aggregate the data for the past 1 minute and
reshuffle so that we have less partition count
and write them into s3.
We disabled the enable.auto.commit and enabled
commit_offset_in_finalize. also the
auto.offset.reset is set to "latest"
image.png
According to the log, I can definitely find the
data is consuming from Kafka Offset, Because
there are many
```
Resetting offset for topic XXXX-<PARTITION> to
offset <OFFSET>
```
and that partition/offset pair does match the
missing records. However, it doesn't show up
in the final S3.
My current hypothesis is that the shuffling
might be the reason for the issue, for example,
originally in kafka for the past minute in
partition 1, I have offset 1, 2, 3 records.
After reshuffle, it now distribute, for example:
- partition A: 1, 3
- partition B: 2
And if partition A is done successfully but
partition B fails. Given that A is succeeded,
it will commit its offset to Kafka, and thus
kafka now has an offset to 3. And when kafka
retries , it will skip the offset 2. However,
I am not sure how exactly the offset commit
works, wondering how it interacts with the
checkpoints. But it does seem like if my
hypothesis is correct, we should be seeing more
missing records, however, this seems rare to
happen. Wondering if anyone can help identify
potential root causes? Thanks