Hi Jan,

Thanks so much for your help. Here's our write to s3:

from pyarrow.parquet import ParquetWriter

class WriteBatchesToS3(beam.DoFn):
def __init__(
self,
output_path: str,
schema: pa.schema,
pipeline_options: PipelineOptions,
) -> None:
self.output_path = output_path
self.schema = schema
self.pipeline_options = pipeline_options

def process(self, data: Iterable[List[Dict]]) -> None:
"""Write one batch per file to S3."""
client = beam.io.aws.clients.s3.boto3_client.Client(options=self.
pipeline_options)

fields_without_metadata = [pa.field(f.name, f.type) for f in self.schema]
schema_without_field_metadata = pa.schema(fields_without_metadata)

filename = os.path.join(
self.output_path,
f"uuid_{str(uuid4())}.parquet",
)

tables = [pa.Table.from_pylist(batch, schema=schema_without_field_metadata)
for batch in data]
if len(tables) == 0:
logging.info(f"No data to write for key: {partition_date}, the grouped
contents are: {data}")
return

with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, mode="w")
as s3_writer:
with ParquetWriter(
s3_writer, schema_without_field_metadata, compression="SNAPPY",
use_deprecated_int96_timestamps=True
) as parquet_writer:
merged_tables = pa.concat_tables(tables)
parquet_writer.write_table(merged_tables)

On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Lydian,
>
> because you do not specify 'timestamp_policy' it should use the default,
> which should be processing time, so this should not be the issue. The one
> potentially left transform is the sink transform, as Reuven mentioned. Can
> you share details of the implementation?
>
>  Jan
> On 9/19/24 23:10, Lydian Lee wrote:
>
> Hi, Jan
>
> Here's how we do ReadFromKafka, the expansion service is just to ensure we
> can work with xlang in k8s, so please ignore them.
> from apache_beam.io.kafka import default_io_expansion_service
> ReadFromKafka(
> consumer_config={
> "group.id": "group-name",
> "auto.offset.reset": "latest",
> "enable.auto.commit": "false",
> },
> topics="topic-name",
> with_metadata=False,
> expansion_service=default_io_expansion_service(
> append_args=[
> f"--defaultEnvironmentType=PROCESS",
> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}',
> "--experiments=use_deprecated_read",
> ]
> ),
> commit_offset_in_finalize=True,
> )
>
> Do you know what would be the right approach for using processing time
> instead? I thought the WindowInto supposed to use the timestamp we appened
> to the event?  Do you think it is still using the original Kafka event
> timestamp?  Thanks!
>
>
>
> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Can you share the (relevant) parameters of the ReadFromKafka transform?
>>
>> This feels strange, and it might not do what you'd expect:
>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event:
>> window.TimestampedValue(event, time.time()))
>>
>> This does not change the assigned timestamp of an element, but creates a
>> new element which contains processing time. It will not be used for
>> windowing, though.
>> On 9/19/24 00:49, Lydian Lee wrote:
>>
>> Hi Reuven,
>>
>> Here's a quick look for our pipeline:
>> (
>> pipeline
>> | "Reading message from Kafka" >> ReadFromKafka(...)
>> | "Deserializing events" >> Deserialize(**deserializer_args)
>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event:
>> window.TimestampedValue(event, time.time()))
>> | "Window into Fixed Intervals" >> beam.WindowInto(
>> beam.transforms.window.FixedWindows(fixed_window_size), #
>> fixed_window_size is 1 min.
>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), #
>> although we configured lateness, but because we are using processing time,
>> i don't expect any late events
>> )
>> | "Adding random integer partition key" >> beam.Map(
>> lambda event: (random.randint(1, 5), element) # add dummy key to
>> reshuffle to less partitions.  Kafka have 16 partition, but we only want to
>> generate 2 files every minute
>> )
>> | "Group by randomly-assigned integer key" >> beam.GroupByKey()
>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...)  #
>> call boto3 to write the events into S3 with parquet format
>> )
>>
>> Thanks!
>>
>>
>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <user@beam.apache.org>
>> wrote:
>>
>>> How are you doing this aggregation?
>>>
>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jan,
>>>>
>>>> Thanks for the recommendation. In our case, we are windowing with the
>>>> processing time, which means that there should be no late event at all.
>>>>
>>>> You’ve mentioned that GroupByKey is stateful and can potentially drop
>>>> the data. Given that after reshuffle (add random shuffle id to the key), we
>>>> then do the aggregation (combine the data and write those data to S3.) Do
>>>> you think the example I mentioned earlier could potentially be the reason
>>>> for the dropping data?
>>>>
>>>> If so, in general how does Beam being able to prevent that ? Are there
>>>> any suggested approaches? Thanks
>>>>
>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Lydian,
>>>>>
>>>>> in that case, there is only a generic advice you can look into.
>>>>> Reshuffle is a stateless operation that should not cause dropping data. A
>>>>> GroupByKey on the other hand is stateful and thus can - when dealing with
>>>>> late data - drop some of them. You should be able to confirm this looking
>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens
>>>>> when elements arrive after watermark passes element's timestamp minus
>>>>> allowed lateness. If you see the log, you might need to either change how
>>>>> you assign timestamps to elements (e.g. use log append time) or increase
>>>>> allowed lateness of your windowfn.
>>>>>
>>>>> Best,
>>>>>
>>>>>  Jan
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>>>>> On 9/18/24 08:53, Lydian Lee wrote:
>>>>>
>>>>> I would love to, but there are some limitations on our ends that the
>>>>> version bump won’t be happened soon. Thus I need to figure out what might
>>>>> be the root cause though.
>>>>>
>>>>>
>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Lydian,
>>>>>>
>>>>>> 2.41.0 is quite old, can you please try current version to see if
>>>>>> this issue is still present? There were lots of changes between 2.41.0 
>>>>>> and
>>>>>> 2.59.0.
>>>>>>
>>>>>>  Jan
>>>>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are using Beam Python SDK with Flink Runner, the Beam version is
>>>>>> 2.41.0 and the Flink version is 1.15.4.
>>>>>>
>>>>>> We have a pipeline that has 2 stages:
>>>>>> 1. read from kafka and fixed window for every 1 minute
>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that we
>>>>>> have less partition count and write them into s3.
>>>>>>
>>>>>> We disabled the enable.auto.commit and enabled
>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest"
>>>>>> [image: image.png]
>>>>>>
>>>>>> According to the log, I can definitely find the data is consuming
>>>>>> from Kafka Offset, Because there are many
>>>>>> ```
>>>>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>>>>> ```
>>>>>> and that partition/offset pair does match the missing records.
>>>>>> However, it doesn't show up in the final S3.
>>>>>>
>>>>>> My current hypothesis is that the shuffling might be the reason for
>>>>>> the issue, for example, originally in kafka for the past minute in
>>>>>> partition 1,  I have offset 1, 2, 3 records. After reshuffle, it now
>>>>>> distribute, for example:
>>>>>> - partition A: 1, 3
>>>>>> - partition B: 2
>>>>>>
>>>>>> And if partition A is done successfully but partition B fails. Given
>>>>>> that A is succeeded, it will commit its offset to Kafka, and thus kafka 
>>>>>> now
>>>>>> has an offset to 3.  And when kafka retries , it will skip the offset 2.
>>>>>>  However, I am not sure how exactly the offset commit works, wondering 
>>>>>> how
>>>>>> it interacts with the checkpoints.  But it does seem like if my 
>>>>>> hypothesis
>>>>>> is correct, we should be seeing more missing records, however, this seems
>>>>>> rare to happen.  Wondering if anyone can help identify potential
>>>>>> root causes?  Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to