Also as I said, the duplicate files might not appear like duplicates, which can be quite confusing.
Out of curiosity, I would try - just for testing_ remove the line where you "add" processing time, and also set allowed_lateness to the largest allowed value. This will help determine whether late data is causing the dropped records. Reuven On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee <tingyenlee...@gmail.com> wrote: > Hi Jan, > > Thanks for taking a quick look! Yes, the "with" statement would close > after the write. In our use cases, we actually don't mind if there are > duplicates of data written, but we are more concerned about the missing > data, which is the issue we are facing right now. > > On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user <user@beam.apache.org> > wrote: > >> Do you close the write afterwards? If not, I wonder if you could lose >> records due to unflushed data. >> >> Also - what you're doing here can definitely lead to duplicate data >> written, since this DoFn can be run multiple times. The duplicates might >> also appear different if the Iterables are slightly different on retries, >> especially in the case when Flink restarts a checkpoint. >> >> Reuven >> >> On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <tingyenlee...@gmail.com> >> wrote: >> >>> Hi Jan, >>> >>> Thanks so much for your help. Here's our write to s3: >>> >>> from pyarrow.parquet import ParquetWriter >>> >>> class WriteBatchesToS3(beam.DoFn): >>> def __init__( >>> self, >>> output_path: str, >>> schema: pa.schema, >>> pipeline_options: PipelineOptions, >>> ) -> None: >>> self.output_path = output_path >>> self.schema = schema >>> self.pipeline_options = pipeline_options >>> >>> def process(self, data: Iterable[List[Dict]]) -> None: >>> """Write one batch per file to S3.""" >>> client = beam.io.aws.clients.s3.boto3_client.Client(options=self. >>> pipeline_options) >>> >>> fields_without_metadata = [pa.field(f.name, f.type) for f in self.schema >>> ] >>> schema_without_field_metadata = pa.schema(fields_without_metadata) >>> >>> filename = os.path.join( >>> self.output_path, >>> f"uuid_{str(uuid4())}.parquet", >>> ) >>> >>> tables = [pa.Table.from_pylist(batch, schema= >>> schema_without_field_metadata) for batch in data] >>> if len(tables) == 0: >>> logging.info(f"No data to write for key: {partition_date}, the grouped >>> contents are: {data}") >>> return >>> >>> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, mode= >>> "w") as s3_writer: >>> with ParquetWriter( >>> s3_writer, schema_without_field_metadata, compression="SNAPPY", >>> use_deprecated_int96_timestamps=True >>> ) as parquet_writer: >>> merged_tables = pa.concat_tables(tables) >>> parquet_writer.write_table(merged_tables) >>> >>> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Lydian, >>>> >>>> because you do not specify 'timestamp_policy' it should use the >>>> default, which should be processing time, so this should not be the issue. >>>> The one potentially left transform is the sink transform, as Reuven >>>> mentioned. Can you share details of the implementation? >>>> >>>> Jan >>>> On 9/19/24 23:10, Lydian Lee wrote: >>>> >>>> Hi, Jan >>>> >>>> Here's how we do ReadFromKafka, the expansion service is just to ensure >>>> we can work with xlang in k8s, so please ignore them. >>>> from apache_beam.io.kafka import default_io_expansion_service >>>> ReadFromKafka( >>>> consumer_config={ >>>> "group.id": "group-name", >>>> "auto.offset.reset": "latest", >>>> "enable.auto.commit": "false", >>>> }, >>>> topics="topic-name", >>>> with_metadata=False, >>>> expansion_service=default_io_expansion_service( >>>> append_args=[ >>>> f"--defaultEnvironmentType=PROCESS", >>>> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}', >>>> "--experiments=use_deprecated_read", >>>> ] >>>> ), >>>> commit_offset_in_finalize=True, >>>> ) >>>> >>>> Do you know what would be the right approach for using processing time >>>> instead? I thought the WindowInto supposed to use the timestamp we appened >>>> to the event? Do you think it is still using the original Kafka event >>>> timestamp? Thanks! >>>> >>>> >>>> >>>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Can you share the (relevant) parameters of the ReadFromKafka transform? >>>>> >>>>> This feels strange, and it might not do what you'd expect: >>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda >>>>> event: window.TimestampedValue(event, time.time())) >>>>> >>>>> This does not change the assigned timestamp of an element, but creates >>>>> a new element which contains processing time. It will not be used for >>>>> windowing, though. >>>>> On 9/19/24 00:49, Lydian Lee wrote: >>>>> >>>>> Hi Reuven, >>>>> >>>>> Here's a quick look for our pipeline: >>>>> ( >>>>> pipeline >>>>> | "Reading message from Kafka" >> ReadFromKafka(...) >>>>> | "Deserializing events" >> Deserialize(**deserializer_args) >>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda >>>>> event: window.TimestampedValue(event, time.time())) >>>>> | "Window into Fixed Intervals" >> beam.WindowInto( >>>>> beam.transforms.window.FixedWindows(fixed_window_size), # >>>>> fixed_window_size is 1 min. >>>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # >>>>> although we configured lateness, but because we are using processing time, >>>>> i don't expect any late events >>>>> ) >>>>> | "Adding random integer partition key" >> beam.Map( >>>>> lambda event: (random.randint(1, 5), element) # add dummy key to >>>>> reshuffle to less partitions. Kafka have 16 partition, but we only want >>>>> to >>>>> generate 2 files every minute >>>>> ) >>>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey() >>>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) >>>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...) # >>>>> call boto3 to write the events into S3 with parquet format >>>>> ) >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user < >>>>> user@beam.apache.org> wrote: >>>>> >>>>>> How are you doing this aggregation? >>>>>> >>>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <tingyenlee...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Jan, >>>>>>> >>>>>>> Thanks for the recommendation. In our case, we are windowing with >>>>>>> the processing time, which means that there should be no late event at >>>>>>> all. >>>>>>> >>>>>>> You’ve mentioned that GroupByKey is stateful and can potentially >>>>>>> drop the data. Given that after reshuffle (add random shuffle id to the >>>>>>> key), we then do the aggregation (combine the data and write those data >>>>>>> to >>>>>>> S3.) Do you think the example I mentioned earlier could potentially be >>>>>>> the >>>>>>> reason for the dropping data? >>>>>>> >>>>>>> If so, in general how does Beam being able to prevent that ? Are >>>>>>> there any suggested approaches? Thanks >>>>>>> >>>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <je...@seznam.cz> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Lydian, >>>>>>>> >>>>>>>> in that case, there is only a generic advice you can look into. >>>>>>>> Reshuffle is a stateless operation that should not cause dropping >>>>>>>> data. A >>>>>>>> GroupByKey on the other hand is stateful and thus can - when dealing >>>>>>>> with >>>>>>>> late data - drop some of them. You should be able to confirm this >>>>>>>> looking >>>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens >>>>>>>> when elements arrive after watermark passes element's timestamp minus >>>>>>>> allowed lateness. If you see the log, you might need to either change >>>>>>>> how >>>>>>>> you assign timestamps to elements (e.g. use log append time) or >>>>>>>> increase >>>>>>>> allowed lateness of your windowfn. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Jan >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >>>>>>>> On 9/18/24 08:53, Lydian Lee wrote: >>>>>>>> >>>>>>>> I would love to, but there are some limitations on our ends that >>>>>>>> the version bump won’t be happened soon. Thus I need to figure out what >>>>>>>> might be the root cause though. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <je...@seznam.cz> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Lydian, >>>>>>>>> >>>>>>>>> 2.41.0 is quite old, can you please try current version to see if >>>>>>>>> this issue is still present? There were lots of changes between >>>>>>>>> 2.41.0 and >>>>>>>>> 2.59.0. >>>>>>>>> >>>>>>>>> Jan >>>>>>>>> On 9/17/24 17:49, Lydian Lee wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam version >>>>>>>>> is 2.41.0 and the Flink version is 1.15.4. >>>>>>>>> >>>>>>>>> We have a pipeline that has 2 stages: >>>>>>>>> 1. read from kafka and fixed window for every 1 minute >>>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that >>>>>>>>> we have less partition count and write them into s3. >>>>>>>>> >>>>>>>>> We disabled the enable.auto.commit and enabled >>>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to >>>>>>>>> "latest" >>>>>>>>> [image: image.png] >>>>>>>>> >>>>>>>>> According to the log, I can definitely find the data is consuming >>>>>>>>> from Kafka Offset, Because there are many >>>>>>>>> ``` >>>>>>>>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>>>>>>>> ``` >>>>>>>>> and that partition/offset pair does match the missing records. >>>>>>>>> However, it doesn't show up in the final S3. >>>>>>>>> >>>>>>>>> My current hypothesis is that the shuffling might be the reason >>>>>>>>> for the issue, for example, originally in kafka for the past minute in >>>>>>>>> partition 1, I have offset 1, 2, 3 records. After reshuffle, it now >>>>>>>>> distribute, for example: >>>>>>>>> - partition A: 1, 3 >>>>>>>>> - partition B: 2 >>>>>>>>> >>>>>>>>> And if partition A is done successfully but partition B fails. >>>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, and >>>>>>>>> thus >>>>>>>>> kafka now has an offset to 3. And when kafka retries , it will skip >>>>>>>>> the >>>>>>>>> offset 2. However, I am not sure how exactly the offset commit >>>>>>>>> works, >>>>>>>>> wondering how it interacts with the checkpoints. But it does seem >>>>>>>>> like if >>>>>>>>> my hypothesis is correct, we should be seeing more missing records, >>>>>>>>> however, this seems rare to happen. Wondering if anyone can help >>>>>>>>> identify >>>>>>>>> potential root causes? Thanks >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>