Kafka is a complex example because it is adapting code from before there was an SDF implementation (namely the TimestampPolicy and the TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
There are three types of watermark estimators that are in the Beam Java SDK today: Manual: Can be invoked from within your @ProcessElement method within your SDF allowing you precise control over what the watermark is. WallTime: Doesn't need to be interacted with, will report the current time as the watermark time. Once it is instantiated and returned via the @NewWatermarkEstimator method you don't need to do anything with it. This is functionally equivalent to calling setWatermark(Instant.now()) right before returning from the @ProcessElement method in the SplittableDoFn on a Manual watermark. TimestampObserving: Is invoked using the output timestamp for every element that is output. This is functionally equivalent to calling setWatermark after each output within your @ProcessElement method in the SplittableDoFn. The MonotonicallyIncreasing implementation for the TimestampObserving estimator ensures that the largest timestamp seen so far will be reported for the watermark. The default is to not set any watermark estimate. For all watermark estimators you're allowed to set the watermark estimate to anything as the runner will recompute the output watermark as: new output watermark = max(previous output watermark, min(upstream watermark, watermark estimates)) This effectively means that the watermark will never go backwards from the runners point of view but that does mean that setting the watermark estimate below the previous output watermark (which isn't observable) will not do anything beyond holding the watermark at the previous output watermark. Depending on the windowing strategy and allowed lateness, any records that are output with a timestamp that is too early can be considered droppably late, otherwise they will be late/ontime/early. So as an author for an SDF transform, you need to figure out: 1) What timestamp your going to output your records at * use upstream input elements timestamp: guidance use the default implementation and to get the upstream watermark by default * use data from within the record being output or external system state via an API call: use a watermark estimator 2) How you want to compute the watermark estimate (if at all) * the choice here depends on how the elements timestamps progress, are they in exactly sorted order, almost sorted order, completely unsorted, ...? For both of these it is upto you to choose how much flexibility in these decisions you want to give to your users and that should guide what you expose within the API (like how KafkaIO exposes a TimestampPolicy) or how many other sources don't expose anything. On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan < harish.prav...@gmail.com> wrote: > Hi Luke, > > I am also looking at the `WatermarkEstimators.manual` option, in parallel. > Now we are getting data past our Fixed Window but the aggregation is not as > expected. The doc says setWatermark will "set timestamp before or at the > timestamps of all future elements produced by the associated DoFn". If I > output with a timestamp as below then could you please clarify on how we > should set the watermark for this manual watermark estimator? > > receiver.outputWithTimestamp(ossRecord, Instant.now()); > > Thanks, > Praveen > > On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote: > >> Is the watermark advancing[1, 2] for the SDF such that the windows can >> close allowing for the Count transform to produce output? >> >> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4 >> 2: https://beam.apache.org/documentation/programming-guide/#windowing >> >> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <gaurav.na...@oracle.com> >> wrote: >> >>> Hi everyone! >>> >>> We are developing a new IO connector using the SDF API, and testing it >>> with the following simple counting pipeline: >>> >>> >>> >>> p.apply(MyIO.read() >>> >>> .withStream(inputStream) >>> >>> .withStreamPartitions(Arrays.asList(0)) >>> >>> .withConsumerConfig(config) >>> >>> ) // gets a PCollection<KV<String, String>> >>> >>> >>> >>> >>> >>> .apply(Values.<String>*create*()) // PCollection<String> >>> >>> >>> >>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) >>> >>> .withAllowedLateness(Duration.standardDays(1)) >>> >>> .accumulatingFiredPanes()) >>> >>> >>> >>> .apply(Count.<String>perElement()) >>> >>> >>> >>> >>> >>> // write PCollection<KV<String, Long>> to stream >>> >>> .apply(MyIO.write() >>> >>> .withStream(outputStream) >>> >>> .withConsumerConfig(config)); >>> >>> >>> >>> >>> >>> Without the window transform, we can read from the stream and write to >>> it, however, I don’t see output after the Window transform. Could you >>> please help pin down the issue? >>> >>> Thank you, >>> >>> Gaurav >>> >> > > -- > Thanks, > Praveen K Viswanathan >