Hello,
I am jumping on this, because we are doing same things as Lydian.
In our case, we are using default timestamp strategy in kafka (so
processing timestamp).
We were also doing same as Lydian to add processing timestamp
manually.
However we have late data. It mainly happen in our integration
test with flink. (parallelism 1), and happen really rarely in
production.
So it means we can't control the timestamp of an item even with
`window.TimestampedValue(event, time.time()))`?
Best,
Marc
On Tue, Sep 24, 2024, 04:23 Reuven Lax via user
<user@beam.apache.org> wrote:
Also as I said, the duplicate files might not appear like
duplicates, which can be quite confusing.
Out of curiosity, I would try - just for testing_ remove the
line where you "add" processing time, and also set
allowed_lateness to the largest allowed value. This will help
determine whether late data is causing the dropped records.
Reuven
On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee
<tingyenlee...@gmail.com> wrote:
Hi Jan,
Thanks for taking a quick look! Yes, the "with"
statement would close after the write. In our use cases,
we actually don't mind if there are duplicates of data
written, but we are more concerned about the missing
data, which is the issue we are facing right now.
On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user
<user@beam.apache.org> wrote:
Do you close the write afterwards? If not, I wonder
if you could lose records due to unflushed data.
Also - what you're doing here can definitely lead to
duplicate data written, since this DoFn can be run
multiple times. The duplicates might also appear
different if the Iterables are slightly different on
retries, especially in the case when Flink restarts a
checkpoint.
Reuven
On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee
<tingyenlee...@gmail.com> wrote:
Hi Jan,
Thanks so much for your help. Here's our write to
s3:
from pyarrow.parquet import ParquetWriter
class WriteBatchesToS3(beam.DoFn):
def __init__(
self,
output_path: str,
schema: pa.schema,
pipeline_options: PipelineOptions,
) -> None:
self.output_path = output_path
self.schema = schema
self.pipeline_options = pipeline_options
def process(self, data: Iterable[List[Dict]]) ->
None:
"""Write one batch per file to S3."""
client =
beam.io.aws.clients.s3.boto3_client.Client(options=self.pipeline_options)
fields_without_metadata = [pa.field(f.name,
f.type) for f in self.schema]
schema_without_field_metadata =
pa.schema(fields_without_metadata)
filename = os.path.join(
self.output_path,
f"uuid_{str(uuid4())}.parquet",
)
tables = [pa.Table.from_pylist(batch,
schema=schema_without_field_metadata) for batch
in data]
if len(tables) == 0:
logging.info <http://logging.info>(f"No data to
write for key: {partition_date}, the grouped
contents are: {data}")
return
with
beam.io.aws.s3io.S3IO(client=client).open(filename=filename,
mode="w") as s3_writer:
with ParquetWriter(
s3_writer, schema_without_field_metadata,
compression="SNAPPY",
use_deprecated_int96_timestamps=True
) as parquet_writer:
merged_tables = pa.concat_tables(tables)
parquet_writer.write_table(merged_tables)
On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Lydian,
because you do not specify 'timestamp_policy'
it should use the default, which should be
processing time, so this should not be the
issue. The one potentially left transform is
the sink transform, as Reuven mentioned. Can
you share details of the implementation?
Jan
On 9/19/24 23:10, Lydian Lee wrote:
Hi, Jan
Here's how we do ReadFromKafka, the
expansion service is just to ensure we can
work with xlang in k8s, so please ignore them.
from apache_beam.io.kafka import
default_io_expansion_service
ReadFromKafka(
consumer_config={
"group.id <http://group.id>": "group-name",
"auto.offset.reset": "latest",
"enable.auto.commit": "false",
},
topics="topic-name",
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
f"--defaultEnvironmentType=PROCESS",
f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
"--experiments=use_deprecated_read",
]
),
commit_offset_in_finalize=True,
)
Do you know what would be the right approach
for using processing time instead? I thought
the WindowInto supposed to use the timestamp
we appened to the event? Do you think it is
still using the original Kafka event
timestamp? Thanks!
On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský
<je...@seznam.cz> wrote:
Can you share the (relevant) parameters
of the ReadFromKafka transform?
This feels strange, and it might not do
what you'd expect:
| "Adding 'trigger_processing_time'
timestamp" >> beam.Map(lambda event:
window.TimestampedValue(event, time.time()))
This does not change the assigned
timestamp of an element, but creates a
new element which contains processing
time. It will not be used for windowing,
though.
On 9/19/24 00:49, Lydian Lee wrote:
Hi Reuven,
Here's a quick look for our pipeline:
(
pipeline
|"Reading message from
Kafka">>ReadFromKafka(...)
| "Deserializing events" >>
Deserialize(**deserializer_args)
| "Adding 'trigger_processing_time'
timestamp" >> beam.Map(lambda event:
window.TimestampedValue(event,
time.time()))
| "Window into Fixed Intervals" >>
beam.WindowInto(
beam.transforms.window.FixedWindows(fixed_window_size),
# fixed_window_size is 1 min.
allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness),
# although we configured lateness, but
because we are using processing time, i
don't expect any late events
)
| "Adding random integer partition key"
>> beam.Map(
lambda event: (random.randint(1, 5),
element) # add dummy key to reshuffle
to less partitions. Kafka have 16
partition, but we only want to generate
2 files every minute
)
| "Group by randomly-assigned integer
key" >> beam.GroupByKey()
| "Abandon Dummy Key" >>
beam.MapTuple(lambda key, val: val)
| "Writing event data batches to
parquet" >> WriteBatchesToS3(...) #
call boto3 to write the events into S3
with parquet format
)
Thanks!
On Wed, Sep 18, 2024 at 3:16 PM Reuven
Lax via user <user@beam.apache.org> wrote:
How are you doing this aggregation?
On Wed, Sep 18, 2024 at 3:11 PM
Lydian Lee
<tingyenlee...@gmail.com> wrote:
Hi Jan,
Thanks for the recommendation.
In our case, we are windowing
with the processing time, which
means that there should be no
late event at all.
You’ve mentioned that
GroupByKey is stateful and can
potentially drop the data.
Given that after reshuffle (add
random shuffle id to the key),
we then do the aggregation
(combine the data and write
those data to S3.) Do you think
the example I mentioned earlier
could potentially be the reason
for the dropping data?
If so, in general how does Beam
being able to prevent that ?
Are there any suggested
approaches? Thanks
On Wed, Sep 18, 2024 at
12:33 AM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Lydian,
in that case, there is only
a generic advice you can
look into. Reshuffle is a
stateless operation that
should not cause dropping
data. A GroupByKey on the
other hand is stateful and
thus can - when dealing
with late data - drop some
of them. You should be able
to confirm this looking for
'droppedDueToLateness'
counter and/or log in here
[1]. This happens when
elements arrive after
watermark passes element's
timestamp minus allowed
lateness. If you see the
log, you might need to
either change how you
assign timestamps to
elements (e.g. use log
append time) or increase
allowed lateness of your
windowfn.
Best,
Jan
[1]
https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
On 9/18/24 08:53, Lydian
Lee wrote:
I would love to, but there
are some limitations on
our ends that the version
bump won’t be happened
soon. Thus I need to
figure out what might be
the root cause though.
On Tue, Sep 17, 2024 at
11:26 PM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Lydian,
2.41.0 is quite old,
can you please try
current version to see
if this issue is still
present? There were
lots of changes
between 2.41.0 and 2.59.0.
Jan
On 9/17/24 17:49,
Lydian Lee wrote:
Hi,
We are using Beam
Python SDK with Flink
Runner, the Beam
version is 2.41.0 and
the Flink version is
1.15.4.
We have a pipeline
that has 2 stages:
1. read from kafka
and fixed window for
every 1 minute
2. aggregate the data
for the past 1 minute
and reshuffle so that
we have less
partition count and
write them into s3.
We disabled the
enable.auto.commit
and enabled
commit_offset_in_finalize.
also the
auto.offset.reset is
set to "latest"
image.png
According to the log,
I can definitely find
the data is consuming
from Kafka Offset,
Because there are many
```
Resetting offset for
topic
XXXX-<PARTITION> to
offset <OFFSET>
```
and that
partition/offset pair
does match the
missing records.
However, it doesn't
show up in the final S3.
My current hypothesis
is that the shuffling
might be the reason
for the issue, for
example, originally
in kafka for the past
minute in partition
1, I have offset 1,
2, 3 records. After
reshuffle, it now
distribute, for example:
- partition A: 1, 3
- partition B: 2
And if partition A is
done successfully but
partition B fails.
Given that A is
succeeded, it will
commit its offset to
Kafka, and thus kafka
now has an offset to
3. And when kafka
retries , it will
skip the offset 2.
However, I am not
sure how exactly the
offset commit works,
wondering how it
interacts with the
checkpoints. But it
does seem like if my
hypothesis is
correct, we should be
seeing more missing
records, however,
this seems rare to
happen. Wondering if
anyone can help
identify potential
root causes? Thanks