The pipeline works fine with the Direct Runner. The issue appears to be
specific to streaming mode on Dataflow. I've updated my pipeline to use
Pub/Sub as an input instead, and digging into the Dataflow console, it
looks like execution of a particular GroupByKey is moving extremely slowly
-- the watermark for the prior step is caught up to real time, but the
GroupByKey step data watermark is currently June 4, 2020 and takes about 10
minutes to advance a single day, which is ridiculously slow. In batch mode,
the whole backlog is processed in about 30 minutes.
It is actually a reasonably large and complex pipeline, so I'm not actually
sure where I would even start with code snippets. The particular GroupByKey
that seems to be running extremely slowly is "FixedTickGroupByTeam" and is
in a part of the pipeline that looks like this -- it really should be
processing only a few events per day with an event time of Jun 4th 2020, so
something is definitely not right:
val scores = timestampedCheckins
.apply(
"FixedTickWindows",
Window.into<Checkin>(FixedWindows.of(5.minutes.asJoda()))
// NOTE that we use the default timestamp combiner (end of
window) to avoid
// https://issues.apache.org/jira/browse/BEAM-2262
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(3.days.asJoda(),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
.withTimestampCombiner(TimestampCombiner.LATEST)
.discardingFiredPanes()
)
.apply("FixedTickKeyByTeam", WithKeys.of { it.teamId })
.apply("FixedTickGroupByTeam", GroupByKey.create())
.apply("GlobalWindowsLoopingStatefulTimer",
Window.into(GlobalWindows())
)
.apply("LoopingStatefulTimer",
ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays
?: 30).days))
)
// this window and subsequent group by and flatten combines the
empty iterable timer output with any actual check-ins
.apply("FixedTickWindowsPostTimer",
Window.into<KV<String, Iterable<@JvmWildcard
Checkin>>>(FixedWindows.of(5.minutes.asJoda()))
.applyWindowingOptions()
)
.apply("FixedTickGroupByTeamPostTimer", GroupByKey.create())
.apply("FixedTickFlattenPostTimer", flattenValues())
// convert to an explicit PeriodContent type representing either an
empty period or a period with check-ins
// this allows us to carry forward the timestamp of an empty period,
without it being flattened into a single empty
//.apply("MapToPeriodContent", ParDo.of(MapToPeriodContentFn()))
.apply("CheckinTimingScoreFn",
ParDo.of(CheckinTimingScoreFn(scoreCalculationServiceBuilder,
checkinStateView)).withSideInputs(checkinStateView)
)
On Wed, May 12, 2021 at 12:43 PM Kenneth Knowles <[email protected]> wrote:
> Can you share some more details, such as code? We may identify something
> that relies upon assumptions from batch execution style.
>
> Also notably the Java DirectRunner does not have separate batch/streaming
> mode. It always executes in a "streaming" sort of way. It is also simpler
> in some ways so if you can reproduce it on the DirectRunner that might help.
>
> Kenn
>
> On Tue, May 11, 2021 at 3:41 PM Raman Gupta <[email protected]> wrote:
>
>> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My
>> ideal pipeline backfills its state and output from historical data via the
>> JDBC input, and then continues processing new elements arriving via
>> pub/sub. Conceptually, this seems easy to do with a filter on each source
>> before/after some specific cutoff instant.
>>
>> However, when I add pub/sub into the pipeline, it runs in streaming mode,
>> and the pipeline does not produce the expected results -- all of the
>> results that would be produced based on looping timers seem to be missing.
>>
>> I thought this might be related to the post-inputs Flatten, but I've
>> taken pub/sub out of the equation, and run the same exact JDBC-based
>> pipeline in batch vs streaming mode, and the JDBC-only pipeline in
>> streaming mode produces the same partial results.
>>
>> What could be happening?
>>
>> Regards,
>> Raman
>>
>>