Kafka is a complex example because it is adapting code from before there
was an SDF implementation (namely the TimestampPolicy and the
TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).

There are three types of watermark estimators that are in the Beam Java SDK
today:
Manual: Can be invoked from within your @ProcessElement method within your
SDF allowing you precise control over what the watermark is.
WallTime: Doesn't need to be interacted with, will report the current time
as the watermark time. Once it is instantiated and returned via the
@NewWatermarkEstimator method you don't need to do anything with it. This
is functionally equivalent to calling setWatermark(Instant.now()) right
before returning from the @ProcessElement method in the SplittableDoFn on a
Manual watermark.
TimestampObserving: Is invoked using the output timestamp for every element
that is output. This is functionally equivalent to calling setWatermark
after each output within your @ProcessElement method in the SplittableDoFn.
The MonotonicallyIncreasing implementation for the TimestampObserving
estimator ensures that the largest timestamp seen so far will be reported
for the watermark.

The default is to not set any watermark estimate.

For all watermark estimators you're allowed to set the watermark estimate
to anything as the runner will recompute the output watermark as:
new output watermark = max(previous output watermark, min(upstream
watermark, watermark estimates))
This effectively means that the watermark will never go backwards from the
runners point of view but that does mean that setting the watermark
estimate below the previous output watermark (which isn't observable) will
not do anything beyond holding the watermark at the previous output
watermark.

Depending on the windowing strategy and allowed lateness, any records that
are output with a timestamp that is too early can be considered droppably
late, otherwise they will be late/ontime/early.

So as an author for an SDF transform, you need to figure out:
1) What timestamp your going to output your records at
* use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default
* use data from within the record being output or external system state via
an API call: use a watermark estimator
2) How you want to compute the watermark estimate (if at all)
* the choice here depends on how the elements timestamps progress, are they
in exactly sorted order, almost sorted order, completely unsorted, ...?

For both of these it is upto you to choose how much flexibility in these
decisions you want to give to your users and that should guide what you
expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
many other sources don't expose anything.


On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke,
>
> I am also looking at the `WatermarkEstimators.manual` option, in parallel.
> Now we are getting data past our Fixed Window but the aggregation is not as
> expected.  The doc says setWatermark will "set timestamp before or at the
> timestamps of all future elements produced by the associated DoFn". If I
> output with a timestamp as below then could you please clarify on how we
> should set the watermark for this manual watermark estimator?
>
> receiver.outputWithTimestamp(ossRecord, Instant.now());
>
> Thanks,
> Praveen
>
> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>
>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>> close allowing for the Count transform to produce output?
>>
>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <gaurav.na...@oracle.com>
>> wrote:
>>
>>> Hi everyone!
>>>
>>> We are developing a new IO connector using the SDF API, and testing it
>>> with the following simple counting pipeline:
>>>
>>>
>>>
>>> p.apply(MyIO.read()
>>>
>>>         .withStream(inputStream)
>>>
>>>         .withStreamPartitions(Arrays.asList(0))
>>>
>>>         .withConsumerConfig(config)
>>>
>>>     ) // gets a PCollection<KV<String, String>>
>>>
>>>
>>>
>>>
>>>
>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>
>>>
>>>
>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>
>>>     .withAllowedLateness(Duration.standardDays(1))
>>>
>>>     .accumulatingFiredPanes())
>>>
>>>
>>>
>>> .apply(Count.<String>perElement())
>>>
>>>
>>>
>>>
>>>
>>> // write PCollection<KV<String, Long>> to stream
>>>
>>> .apply(MyIO.write()
>>>
>>>         .withStream(outputStream)
>>>
>>>         .withConsumerConfig(config));
>>>
>>>
>>>
>>>
>>>
>>> Without the window transform, we can read from the stream and write to
>>> it, however, I don’t see output after the Window transform. Could you
>>> please help pin down the issue?
>>>
>>> Thank you,
>>>
>>> Gaurav
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Reply via email to