The pipeline works fine with the Direct Runner. The issue appears to be
specific to streaming mode on Dataflow. I've updated my pipeline to use
Pub/Sub as an input instead, and digging into the Dataflow console, it
looks like execution of a particular GroupByKey is moving extremely slowly
-- the watermark for the prior step is caught up to real time, but the
GroupByKey step data watermark is currently June 4, 2020 and takes about 10
minutes to advance a single day, which is ridiculously slow. In batch mode,
the whole backlog is processed in about 30 minutes.

It is actually a reasonably large and complex pipeline, so I'm not actually
sure where I would even start with code snippets. The particular GroupByKey
that seems to be running extremely slowly is "FixedTickGroupByTeam" and is
in a part of the pipeline that looks like this -- it really should be
processing only a few events per day with an event time of Jun 4th 2020, so
something is definitely not right:

val scores = timestampedCheckins
  .apply(
    "FixedTickWindows",
    Window.into<Checkin>(FixedWindows.of(5.minutes.asJoda()))
      // NOTE that we use the default timestamp combiner (end of
window) to avoid
      // https://issues.apache.org/jira/browse/BEAM-2262
      .triggering(
        AfterWatermark.pastEndOfWindow()
          .withLateFirings(AfterPane.elementCountAtLeast(1))
      )
      .withAllowedLateness(3.days.asJoda(),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
      .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
      .withTimestampCombiner(TimestampCombiner.LATEST)
      .discardingFiredPanes()
  )
  .apply("FixedTickKeyByTeam", WithKeys.of { it.teamId })
  .apply("FixedTickGroupByTeam", GroupByKey.create())
  .apply("GlobalWindowsLoopingStatefulTimer",
    Window.into(GlobalWindows())
  )
  .apply("LoopingStatefulTimer",
    ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays
?: 30).days))
  )
  // this window and subsequent group by and flatten combines the
empty iterable timer output with any actual check-ins
  .apply("FixedTickWindowsPostTimer",
    Window.into<KV<String, Iterable<@JvmWildcard
Checkin>>>(FixedWindows.of(5.minutes.asJoda()))
      .applyWindowingOptions()
  )
  .apply("FixedTickGroupByTeamPostTimer", GroupByKey.create())
  .apply("FixedTickFlattenPostTimer", flattenValues())
  // convert to an explicit PeriodContent type representing either an
empty period or a period with check-ins
  // this allows us to carry forward the timestamp of an empty period,
without it being flattened into a single empty
  //.apply("MapToPeriodContent", ParDo.of(MapToPeriodContentFn()))
  .apply("CheckinTimingScoreFn",
    ParDo.of(CheckinTimingScoreFn(scoreCalculationServiceBuilder,
checkinStateView)).withSideInputs(checkinStateView)
  )






On Wed, May 12, 2021 at 12:43 PM Kenneth Knowles <[email protected]> wrote:

> Can you share some more details, such as code? We may identify something
> that relies upon assumptions from batch execution style.
>
> Also notably the Java DirectRunner does not have separate batch/streaming
> mode. It always executes in a "streaming" sort of way. It is also simpler
> in some ways so if you can reproduce it on the DirectRunner that might help.
>
> Kenn
>
> On Tue, May 11, 2021 at 3:41 PM Raman Gupta <[email protected]> wrote:
>
>> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My
>> ideal pipeline backfills its state and output from historical data via the
>> JDBC input, and then continues processing new elements arriving via
>> pub/sub. Conceptually, this seems easy to do with a filter on each source
>> before/after some specific cutoff instant.
>>
>> However, when I add pub/sub into the pipeline, it runs in streaming mode,
>> and the pipeline does not produce the expected results -- all of the
>> results that would be produced based on looping timers seem to be missing.
>>
>> I thought this might be related to the post-inputs Flatten, but I've
>> taken pub/sub out of the equation, and run the same exact JDBC-based
>> pipeline in batch vs streaming mode, and the JDBC-only pipeline in
>> streaming mode produces the same partial results.
>>
>> What could be happening?
>>
>> Regards,
>> Raman
>>
>>

Reply via email to