Hi Vamsi,

you cannot cache the context, it is really valid only inside the call to @FinishBundle method.

You can control the size and duration of bundles on Flink [1] and [2] (you might also want to align this with checkpointing interval). If this is insufficient for your case, you can use GroupIntoBatches [3] instead.

Best,

 Jan

[1] https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/runners/flink/FlinkPipelineOptions.html#getMaxBundleTimeMills--

[2] https://beam.apache.org/releases/javadoc/2.47.0/org/apache/beam/runners/flink/FlinkPipelineOptions.html#getMaxBundleSize--

[3] https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/

On 8/5/25 10:41, vamsikrishna korada wrote:
Hi Reuven Lax,

If we cache the FinishBundleContext and then invoke it inside processElement, what would happen? Would the pipeline throw an error, or would those outputs simply be dropped? Is FinishBundleContext a single, shared object that persists for the entire lifespan of the DoFn?


Thanks,
Vamsi

On Fri, 1 Aug 2025 at 23:18, Jin An via user <user@beam.apache.org> wrote:

    Thank you for the quick response and confirming on the incorrect
    finishBundleContext usage.

    Currently, we're writing ORC files in an Iceberg table with its
    writer, combining these files together globally and commits to an
    Iceberg table.
    The problem lied between the writing and the
    snapshotting/committing step where the last couple files aren't
    committed (*infrequently*)

    Hence, we're suspecting that the incorrect use of
    finishBundleContext might be the root cause here, and exploring
    options to properly output to combiner step.
    ------------------------------------------------------------------------
    *From:* Reuven Lax <re...@google.com>
    *Sent:* Friday, August 1, 2025 9:45 AM
    *To:* user@beam.apache.org <user@beam.apache.org>
    *Cc:* Jin An <s...@linkedin.com>
    *Subject:* Re: Question: Best Practice for periodic file flushing
    in streaming DoFn
    This is incorrect - FinishBundleContext is only valid inside of
    finishBundle. You cannot save it beyond the method scope.

    Have you looked at the existing files sinks? Do those not work for
    your use case?

    On Fri, Aug 1, 2025 at 9:29 AM Jin An via user
    <user@beam.apache.org> wrote:

          Hi Beam Community,

          I'm reaching out to get guidance on the proper approach for
        implementing periodic file flushing in a streaming DoFn, as
          we've encountered a data loss issue that appears to be
        related to improper usage of FinishBundleContext.

        * Current Implementation with data loss issue *

          We have a streaming DoFn that:
          1. Accumulates data and writes to files during @ProcessElement
          2. Uses an asynchronous thread that flushes completed files
        every 5 minutes
          3. Emits files to downstream operators using
        FinishBundleContext.output()

        *  Environment Details*

         *
            Runner: Flink (streaming mode)
         *
            Beam Version: 2.45.47
         *
            Use Case: Writing streaming data to Iceberg tables with
            periodic commits


        *  Code Pattern:*
          public class FileWriterDoFn extends DoFn<InputT, OutputFileT> {
            private FinishBundleContext finishBundleContext; // Stored
        as instance variable

            @FinishBundle
            public void finishBundle(FinishBundleContext fbc) {
              this.finishBundleContext = fbc; // Store context reference
              // ... other finishBundle logic
            }

            // Async thread calls this every 5 minutes
            private void flush() {
              // ... create completed files
              for (DataFile file : completedFiles) {
                finishBundleContext.output(file); // Using stored context
              }
            }
          }

          We experienced data loss where:

         *
            Files were successfully written to storage
         *
            Files were never committed/processed by downstream operators
         *
            The number of lost records exactly matched records in
            uncommitted files
         *
            Lost files were consistently the last files in each flush
            batch


          After investigating, we discovered that:

        1.
            FinishBundleContext instances are newly created on every
            @FinishBundle invocation
        2.
            Bundle boundaries occur much more frequently than our
            5-minute flush interval (using default: max bundle time of
            1 second, 1000 max bundle size)
        3.
            Our async flush was using stale FinishBundleContext references
        4.
            However, it's not always reproducible and rather
            infrequent where these data losses occur..


          Bundle finalization triggers we identified:

         *
            Time-based (default: 1000ms)
         *
            Element count-based (default: 1000 elements)
         *
            Side input arrivals (not used in our logic)
         *
            Checkpoint operations (Disabled checkpointing where the
            customer manages its own checkpoint. Happy to share more
            about this but less relevant to the topic for now)
         *
            Watermark processing (I see only on last watermark with
            bounded window triggers this so not appliable for
            streaming job)
         *
            Timer-based processing


        *  Questions for the Community*
        1. What is the recommended pattern for periodic file flushing
        in streaming DoFns?

         *
            Should we use stateful processing with timers instead of
            async threads?

         *
            Are there existing I/O transforms that handle this pattern
            correctly?

        2. Is there ever a valid use case for storing
        FinishBundleContext references beyond the @FinishBundle method
        scope?
            - The Java doc suggests it shouldn't be cached, but we'd
        like to confirm.

        3. For file-based outputs with periodic flushing requirements,
        what are the recommended approaches?

         *
            Custom stateful DoFn with processing-time timers?
         *
            FileIO.write() with windowing?
         *
            Separate transforms for writing vs. committing?
         *
            Tuning bundle configurations (


         4. Are there any existing utilities or patterns in Beam for
        managing this lifecycle correctly?

        Any guidance on the correct approach would be greatly appreciated!
        Thanks in advance for your help!

        Best regards,
        Jin An

Reply via email to