Hi Beam Community,

  I'm reaching out to get guidance on the proper approach for implementing 
periodic file flushing in a streaming DoFn, as
  we've encountered a data loss issue that appears to be related to improper 
usage of FinishBundleContext.

 Current Implementation with data loss issue

  We have a streaming DoFn that:
  1. Accumulates data and writes to files during @ProcessElement
  2. Uses an asynchronous thread that flushes completed files every 5 minutes
  3. Emits files to downstream operators using FinishBundleContext.output()

  Environment Details

  *
Runner: Flink (streaming mode)
  *
Beam Version: 2.45.47
  *
Use Case: Writing streaming data to Iceberg tables with periodic commits

  Code Pattern:
  public class FileWriterDoFn extends DoFn<InputT, OutputFileT> {
    private FinishBundleContext finishBundleContext; // Stored as instance 
variable

    @FinishBundle
    public void finishBundle(FinishBundleContext fbc) {
      this.finishBundleContext = fbc; // Store context reference
      // ... other finishBundle logic
    }

    // Async thread calls this every 5 minutes
    private void flush() {
      // ... create completed files
      for (DataFile file : completedFiles) {
        finishBundleContext.output(file); // Using stored context
      }
    }
  }

  We experienced data loss where:

  *
Files were successfully written to storage
  *
Files were never committed/processed by downstream operators
  *
The number of lost records exactly matched records in uncommitted files
  *
Lost files were consistently the last files in each flush batch

  After investigating, we discovered that:

  1.
FinishBundleContext instances are newly created on every @FinishBundle 
invocation
  2.
Bundle boundaries occur much more frequently than our 5-minute flush interval 
(using default: max bundle time of 1 second, 1000 max bundle size)
  3.
Our async flush was using stale FinishBundleContext references
  4.
However, it's not always reproducible and rather infrequent where these data 
losses occur..

  Bundle finalization triggers we identified:

  *
Time-based (default: 1000ms)
  *
Element count-based (default: 1000 elements)
  *
Side input arrivals (not used in our logic)
  *
Checkpoint operations (Disabled checkpointing where the customer manages its 
own checkpoint. Happy to share more about this but less relevant to the topic 
for now)
  *
Watermark processing (I see only on last watermark with bounded window triggers 
this so not appliable for streaming job)
  *
Timer-based processing

  Questions for the Community
1. What is the recommended pattern for periodic file flushing in streaming 
DoFns?

  *
Should we use stateful processing with timers instead of async threads?

  *
Are there existing I/O transforms that handle this pattern correctly?


2. Is there ever a valid use case for storing FinishBundleContext references 
beyond the @FinishBundle method scope?
    - The Java doc suggests it shouldn't be cached, but we'd like to confirm.

3. For file-based outputs with periodic flushing requirements, what are the 
recommended approaches?

  *
Custom stateful DoFn with processing-time timers?
  *
FileIO.write() with windowing?
  *
Separate transforms for writing vs. committing?
  *
Tuning bundle configurations (

 4. Are there any existing utilities or patterns in Beam for managing this 
lifecycle correctly?

Any guidance on the correct approach would be greatly appreciated!
Thanks in advance for your help!

Best regards,
Jin An

Reply via email to