Hi Jan, Thanks so much for your help. Here's our write to s3:
from pyarrow.parquet import ParquetWriter class WriteBatchesToS3(beam.DoFn): def __init__( self, output_path: str, schema: pa.schema, pipeline_options: PipelineOptions, ) -> None: self.output_path = output_path self.schema = schema self.pipeline_options = pipeline_options def process(self, data: Iterable[List[Dict]]) -> None: """Write one batch per file to S3.""" client = beam.io.aws.clients.s3.boto3_client.Client(options=self. pipeline_options) fields_without_metadata = [pa.field(f.name, f.type) for f in self.schema] schema_without_field_metadata = pa.schema(fields_without_metadata) filename = os.path.join( self.output_path, f"uuid_{str(uuid4())}.parquet", ) tables = [pa.Table.from_pylist(batch, schema=schema_without_field_metadata) for batch in data] if len(tables) == 0: logging.info(f"No data to write for key: {partition_date}, the grouped contents are: {data}") return with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, mode="w") as s3_writer: with ParquetWriter( s3_writer, schema_without_field_metadata, compression="SNAPPY", use_deprecated_int96_timestamps=True ) as parquet_writer: merged_tables = pa.concat_tables(tables) parquet_writer.write_table(merged_tables) On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Lydian, > > because you do not specify 'timestamp_policy' it should use the default, > which should be processing time, so this should not be the issue. The one > potentially left transform is the sink transform, as Reuven mentioned. Can > you share details of the implementation? > > Jan > On 9/19/24 23:10, Lydian Lee wrote: > > Hi, Jan > > Here's how we do ReadFromKafka, the expansion service is just to ensure we > can work with xlang in k8s, so please ignore them. > from apache_beam.io.kafka import default_io_expansion_service > ReadFromKafka( > consumer_config={ > "group.id": "group-name", > "auto.offset.reset": "latest", > "enable.auto.commit": "false", > }, > topics="topic-name", > with_metadata=False, > expansion_service=default_io_expansion_service( > append_args=[ > f"--defaultEnvironmentType=PROCESS", > f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}', > "--experiments=use_deprecated_read", > ] > ), > commit_offset_in_finalize=True, > ) > > Do you know what would be the right approach for using processing time > instead? I thought the WindowInto supposed to use the timestamp we appened > to the event? Do you think it is still using the original Kafka event > timestamp? Thanks! > > > > On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Can you share the (relevant) parameters of the ReadFromKafka transform? >> >> This feels strange, and it might not do what you'd expect: >> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: >> window.TimestampedValue(event, time.time())) >> >> This does not change the assigned timestamp of an element, but creates a >> new element which contains processing time. It will not be used for >> windowing, though. >> On 9/19/24 00:49, Lydian Lee wrote: >> >> Hi Reuven, >> >> Here's a quick look for our pipeline: >> ( >> pipeline >> | "Reading message from Kafka" >> ReadFromKafka(...) >> | "Deserializing events" >> Deserialize(**deserializer_args) >> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: >> window.TimestampedValue(event, time.time())) >> | "Window into Fixed Intervals" >> beam.WindowInto( >> beam.transforms.window.FixedWindows(fixed_window_size), # >> fixed_window_size is 1 min. >> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # >> although we configured lateness, but because we are using processing time, >> i don't expect any late events >> ) >> | "Adding random integer partition key" >> beam.Map( >> lambda event: (random.randint(1, 5), element) # add dummy key to >> reshuffle to less partitions. Kafka have 16 partition, but we only want to >> generate 2 files every minute >> ) >> | "Group by randomly-assigned integer key" >> beam.GroupByKey() >> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) >> | "Writing event data batches to parquet" >> WriteBatchesToS3(...) # >> call boto3 to write the events into S3 with parquet format >> ) >> >> Thanks! >> >> >> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <user@beam.apache.org> >> wrote: >> >>> How are you doing this aggregation? >>> >>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com> >>> wrote: >>> >>>> Hi Jan, >>>> >>>> Thanks for the recommendation. In our case, we are windowing with the >>>> processing time, which means that there should be no late event at all. >>>> >>>> You’ve mentioned that GroupByKey is stateful and can potentially drop >>>> the data. Given that after reshuffle (add random shuffle id to the key), we >>>> then do the aggregation (combine the data and write those data to S3.) Do >>>> you think the example I mentioned earlier could potentially be the reason >>>> for the dropping data? >>>> >>>> If so, in general how does Beam being able to prevent that ? Are there >>>> any suggested approaches? Thanks >>>> >>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Lydian, >>>>> >>>>> in that case, there is only a generic advice you can look into. >>>>> Reshuffle is a stateless operation that should not cause dropping data. A >>>>> GroupByKey on the other hand is stateful and thus can - when dealing with >>>>> late data - drop some of them. You should be able to confirm this looking >>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens >>>>> when elements arrive after watermark passes element's timestamp minus >>>>> allowed lateness. If you see the log, you might need to either change how >>>>> you assign timestamps to elements (e.g. use log append time) or increase >>>>> allowed lateness of your windowfn. >>>>> >>>>> Best, >>>>> >>>>> Jan >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >>>>> On 9/18/24 08:53, Lydian Lee wrote: >>>>> >>>>> I would love to, but there are some limitations on our ends that the >>>>> version bump won’t be happened soon. Thus I need to figure out what might >>>>> be the root cause though. >>>>> >>>>> >>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> Hi Lydian, >>>>>> >>>>>> 2.41.0 is quite old, can you please try current version to see if >>>>>> this issue is still present? There were lots of changes between 2.41.0 >>>>>> and >>>>>> 2.59.0. >>>>>> >>>>>> Jan >>>>>> On 9/17/24 17:49, Lydian Lee wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> We are using Beam Python SDK with Flink Runner, the Beam version is >>>>>> 2.41.0 and the Flink version is 1.15.4. >>>>>> >>>>>> We have a pipeline that has 2 stages: >>>>>> 1. read from kafka and fixed window for every 1 minute >>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that we >>>>>> have less partition count and write them into s3. >>>>>> >>>>>> We disabled the enable.auto.commit and enabled >>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to "latest" >>>>>> [image: image.png] >>>>>> >>>>>> According to the log, I can definitely find the data is consuming >>>>>> from Kafka Offset, Because there are many >>>>>> ``` >>>>>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>>>>> ``` >>>>>> and that partition/offset pair does match the missing records. >>>>>> However, it doesn't show up in the final S3. >>>>>> >>>>>> My current hypothesis is that the shuffling might be the reason for >>>>>> the issue, for example, originally in kafka for the past minute in >>>>>> partition 1, I have offset 1, 2, 3 records. After reshuffle, it now >>>>>> distribute, for example: >>>>>> - partition A: 1, 3 >>>>>> - partition B: 2 >>>>>> >>>>>> And if partition A is done successfully but partition B fails. Given >>>>>> that A is succeeded, it will commit its offset to Kafka, and thus kafka >>>>>> now >>>>>> has an offset to 3. And when kafka retries , it will skip the offset 2. >>>>>> However, I am not sure how exactly the offset commit works, wondering >>>>>> how >>>>>> it interacts with the checkpoints. But it does seem like if my >>>>>> hypothesis >>>>>> is correct, we should be seeing more missing records, however, this seems >>>>>> rare to happen. Wondering if anyone can help identify potential >>>>>> root causes? Thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>