Also as I said, the duplicate files might not appear like duplicates, which
can be quite confusing.

Out of curiosity, I would try - just for testing_ remove the line where you
"add" processing time, and also set allowed_lateness to the largest allowed
value. This will help determine whether late data is causing the dropped
records.

Reuven


On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee <tingyenlee...@gmail.com> wrote:

> Hi Jan,
>
> Thanks for taking a quick look!  Yes, the "with" statement would close
> after the write.  In our use cases, we actually don't mind if there are
> duplicates of data written, but we are more concerned about the missing
> data, which is the issue we are facing right now.
>
> On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user <user@beam.apache.org>
> wrote:
>
>> Do you close the write afterwards? If not, I wonder if you could lose
>> records due to unflushed data.
>>
>> Also - what you're doing here can definitely lead to duplicate data
>> written, since this DoFn can be run multiple times. The duplicates might
>> also appear different if the Iterables are slightly different on retries,
>> especially in the case when Flink restarts a checkpoint.
>>
>> Reuven
>>
>> On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <tingyenlee...@gmail.com>
>> wrote:
>>
>>> Hi Jan,
>>>
>>> Thanks so much for your help. Here's our write to s3:
>>>
>>> from pyarrow.parquet import ParquetWriter
>>>
>>> class WriteBatchesToS3(beam.DoFn):
>>> def __init__(
>>> self,
>>> output_path: str,
>>> schema: pa.schema,
>>> pipeline_options: PipelineOptions,
>>> ) -> None:
>>> self.output_path = output_path
>>> self.schema = schema
>>> self.pipeline_options = pipeline_options
>>>
>>> def process(self, data: Iterable[List[Dict]]) -> None:
>>> """Write one batch per file to S3."""
>>> client = beam.io.aws.clients.s3.boto3_client.Client(options=self.
>>> pipeline_options)
>>>
>>> fields_without_metadata = [pa.field(f.name, f.type) for f in self.schema
>>> ]
>>> schema_without_field_metadata = pa.schema(fields_without_metadata)
>>>
>>> filename = os.path.join(
>>> self.output_path,
>>> f"uuid_{str(uuid4())}.parquet",
>>> )
>>>
>>> tables = [pa.Table.from_pylist(batch, schema=
>>> schema_without_field_metadata) for batch in data]
>>> if len(tables) == 0:
>>> logging.info(f"No data to write for key: {partition_date}, the grouped
>>> contents are: {data}")
>>> return
>>>
>>> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, mode=
>>> "w") as s3_writer:
>>> with ParquetWriter(
>>> s3_writer, schema_without_field_metadata, compression="SNAPPY",
>>> use_deprecated_int96_timestamps=True
>>> ) as parquet_writer:
>>> merged_tables = pa.concat_tables(tables)
>>> parquet_writer.write_table(merged_tables)
>>>
>>> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Lydian,
>>>>
>>>> because you do not specify 'timestamp_policy' it should use the
>>>> default, which should be processing time, so this should not be the issue.
>>>> The one potentially left transform is the sink transform, as Reuven
>>>> mentioned. Can you share details of the implementation?
>>>>
>>>>  Jan
>>>> On 9/19/24 23:10, Lydian Lee wrote:
>>>>
>>>> Hi, Jan
>>>>
>>>> Here's how we do ReadFromKafka, the expansion service is just to ensure
>>>> we can work with xlang in k8s, so please ignore them.
>>>> from apache_beam.io.kafka import default_io_expansion_service
>>>> ReadFromKafka(
>>>> consumer_config={
>>>> "group.id": "group-name",
>>>> "auto.offset.reset": "latest",
>>>> "enable.auto.commit": "false",
>>>> },
>>>> topics="topic-name",
>>>> with_metadata=False,
>>>> expansion_service=default_io_expansion_service(
>>>> append_args=[
>>>> f"--defaultEnvironmentType=PROCESS",
>>>> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
>>>> "--experiments=use_deprecated_read",
>>>> ]
>>>> ),
>>>> commit_offset_in_finalize=True,
>>>> )
>>>>
>>>> Do you know what would be the right approach for using processing time
>>>> instead? I thought the WindowInto supposed to use the timestamp we appened
>>>> to the event?  Do you think it is still using the original Kafka event
>>>> timestamp?  Thanks!
>>>>
>>>>
>>>>
>>>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Can you share the (relevant) parameters of the ReadFromKafka transform?
>>>>>
>>>>> This feels strange, and it might not do what you'd expect:
>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
>>>>> event: window.TimestampedValue(event, time.time()))
>>>>>
>>>>> This does not change the assigned timestamp of an element, but creates
>>>>> a new element which contains processing time. It will not be used for
>>>>> windowing, though.
>>>>> On 9/19/24 00:49, Lydian Lee wrote:
>>>>>
>>>>> Hi Reuven,
>>>>>
>>>>> Here's a quick look for our pipeline:
>>>>> (
>>>>> pipeline
>>>>> | "Reading message from Kafka" >> ReadFromKafka(...)
>>>>> | "Deserializing events" >> Deserialize(**deserializer_args)
>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
>>>>> event: window.TimestampedValue(event, time.time()))
>>>>> | "Window into Fixed Intervals" >> beam.WindowInto(
>>>>> beam.transforms.window.FixedWindows(fixed_window_size), #
>>>>> fixed_window_size is 1 min.
>>>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), #
>>>>> although we configured lateness, but because we are using processing time,
>>>>> i don't expect any late events
>>>>> )
>>>>> | "Adding random integer partition key" >> beam.Map(
>>>>> lambda event: (random.randint(1, 5), element) # add dummy key to
>>>>> reshuffle to less partitions.  Kafka have 16 partition, but we only want 
>>>>> to
>>>>> generate 2 files every minute
>>>>> )
>>>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey()
>>>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
>>>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...)  #
>>>>> call boto3 to write the events into S3 with parquet format
>>>>> )
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <
>>>>> user@beam.apache.org> wrote:
>>>>>
>>>>>> How are you doing this aggregation?
>>>>>>
>>>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jan,
>>>>>>>
>>>>>>> Thanks for the recommendation. In our case, we are windowing with
>>>>>>> the processing time, which means that there should be no late event at 
>>>>>>> all.
>>>>>>>
>>>>>>> You’ve mentioned that GroupByKey is stateful and can potentially
>>>>>>> drop the data. Given that after reshuffle (add random shuffle id to the
>>>>>>> key), we then do the aggregation (combine the data and write those data 
>>>>>>> to
>>>>>>> S3.) Do you think the example I mentioned earlier could potentially be 
>>>>>>> the
>>>>>>> reason for the dropping data?
>>>>>>>
>>>>>>> If so, in general how does Beam being able to prevent that ? Are
>>>>>>> there any suggested approaches? Thanks
>>>>>>>
>>>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Lydian,
>>>>>>>>
>>>>>>>> in that case, there is only a generic advice you can look into.
>>>>>>>> Reshuffle is a stateless operation that should not cause dropping 
>>>>>>>> data. A
>>>>>>>> GroupByKey on the other hand is stateful and thus can - when dealing 
>>>>>>>> with
>>>>>>>> late data - drop some of them. You should be able to confirm this 
>>>>>>>> looking
>>>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens
>>>>>>>> when elements arrive after watermark passes element's timestamp minus
>>>>>>>> allowed lateness. If you see the log, you might need to either change 
>>>>>>>> how
>>>>>>>> you assign timestamps to elements (e.g. use log append time) or 
>>>>>>>> increase
>>>>>>>> allowed lateness of your windowfn.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>>  Jan
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>>>>>>>> On 9/18/24 08:53, Lydian Lee wrote:
>>>>>>>>
>>>>>>>> I would love to, but there are some limitations on our ends that
>>>>>>>> the version bump won’t be happened soon. Thus I need to figure out what
>>>>>>>> might be the root cause though.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Lydian,
>>>>>>>>>
>>>>>>>>> 2.41.0 is quite old, can you please try current version to see if
>>>>>>>>> this issue is still present? There were lots of changes between 
>>>>>>>>> 2.41.0 and
>>>>>>>>> 2.59.0.
>>>>>>>>>
>>>>>>>>>  Jan
>>>>>>>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam version
>>>>>>>>> is 2.41.0 and the Flink version is 1.15.4.
>>>>>>>>>
>>>>>>>>> We have a pipeline that has 2 stages:
>>>>>>>>> 1. read from kafka and fixed window for every 1 minute
>>>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that
>>>>>>>>> we have less partition count and write them into s3.
>>>>>>>>>
>>>>>>>>> We disabled the enable.auto.commit and enabled
>>>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to 
>>>>>>>>> "latest"
>>>>>>>>> [image: image.png]
>>>>>>>>>
>>>>>>>>> According to the log, I can definitely find the data is consuming
>>>>>>>>> from Kafka Offset, Because there are many
>>>>>>>>> ```
>>>>>>>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>>>>>>>> ```
>>>>>>>>> and that partition/offset pair does match the missing records.
>>>>>>>>> However, it doesn't show up in the final S3.
>>>>>>>>>
>>>>>>>>> My current hypothesis is that the shuffling might be the reason
>>>>>>>>> for the issue, for example, originally in kafka for the past minute in
>>>>>>>>> partition 1,  I have offset 1, 2, 3 records. After reshuffle, it now
>>>>>>>>> distribute, for example:
>>>>>>>>> - partition A: 1, 3
>>>>>>>>> - partition B: 2
>>>>>>>>>
>>>>>>>>> And if partition A is done successfully but partition B fails.
>>>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, and 
>>>>>>>>> thus
>>>>>>>>> kafka now has an offset to 3.  And when kafka retries , it will skip 
>>>>>>>>> the
>>>>>>>>> offset 2.   However, I am not sure how exactly the offset commit 
>>>>>>>>> works,
>>>>>>>>> wondering how it interacts with the checkpoints.  But it does seem 
>>>>>>>>> like if
>>>>>>>>> my hypothesis is correct, we should be seeing more missing records,
>>>>>>>>> however, this seems rare to happen.  Wondering if anyone can help 
>>>>>>>>> identify
>>>>>>>>> potential root causes?  Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Reply via email to