To answer your specific question, you should create and return the WallTime estimator. You shouldn't need to interact with it from within your @ProcessElement call since your elements are using the current time for their timestamp.
On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <[email protected]> wrote: > Kafka is a complex example because it is adapting code from before there > was an SDF implementation (namely the TimestampPolicy and the > TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions). > > There are three types of watermark estimators that are in the Beam Java > SDK today: > Manual: Can be invoked from within your @ProcessElement method within your > SDF allowing you precise control over what the watermark is. > WallTime: Doesn't need to be interacted with, will report the current time > as the watermark time. Once it is instantiated and returned via the > @NewWatermarkEstimator method you don't need to do anything with it. This > is functionally equivalent to calling setWatermark(Instant.now()) right > before returning from the @ProcessElement method in the SplittableDoFn on a > Manual watermark. > TimestampObserving: Is invoked using the output timestamp for every > element that is output. This is functionally equivalent to calling > setWatermark after each output within your @ProcessElement method in the > SplittableDoFn. The MonotonicallyIncreasing implementation for > the TimestampObserving estimator ensures that the largest timestamp seen so > far will be reported for the watermark. > > The default is to not set any watermark estimate. > > For all watermark estimators you're allowed to set the watermark estimate > to anything as the runner will recompute the output watermark as: > new output watermark = max(previous output watermark, min(upstream > watermark, watermark estimates)) > This effectively means that the watermark will never go backwards from the > runners point of view but that does mean that setting the watermark > estimate below the previous output watermark (which isn't observable) will > not do anything beyond holding the watermark at the previous output > watermark. > > Depending on the windowing strategy and allowed lateness, any records that > are output with a timestamp that is too early can be considered droppably > late, otherwise they will be late/ontime/early. > > So as an author for an SDF transform, you need to figure out: > 1) What timestamp your going to output your records at > * use upstream input elements timestamp: guidance use the default > implementation and to get the upstream watermark by default > * use data from within the record being output or external system state > via an API call: use a watermark estimator > 2) How you want to compute the watermark estimate (if at all) > * the choice here depends on how the elements timestamps progress, are > they in exactly sorted order, almost sorted order, completely unsorted, ...? > > For both of these it is upto you to choose how much flexibility in these > decisions you want to give to your users and that should guide what you > expose within the API (like how KafkaIO exposes a TimestampPolicy) or how > many other sources don't expose anything. > > > On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan < > [email protected]> wrote: > >> Hi Luke, >> >> I am also looking at the `WatermarkEstimators.manual` option, in >> parallel. Now we are getting data past our Fixed Window but the aggregation >> is not as expected. The doc says setWatermark will "set timestamp >> before or at the timestamps of all future elements produced by the >> associated DoFn". If I output with a timestamp as below then could you >> please clarify on how we should set the watermark for this manual >> watermark estimator? >> >> receiver.outputWithTimestamp(ossRecord, Instant.now()); >> >> Thanks, >> Praveen >> >> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <[email protected]> wrote: >> >>> Is the watermark advancing[1, 2] for the SDF such that the windows can >>> close allowing for the Count transform to produce output? >>> >>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4 >>> 2: https://beam.apache.org/documentation/programming-guide/#windowing >>> >>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <[email protected]> >>> wrote: >>> >>>> Hi everyone! >>>> >>>> We are developing a new IO connector using the SDF API, and testing it >>>> with the following simple counting pipeline: >>>> >>>> >>>> >>>> p.apply(MyIO.read() >>>> >>>> .withStream(inputStream) >>>> >>>> .withStreamPartitions(Arrays.asList(0)) >>>> >>>> .withConsumerConfig(config) >>>> >>>> ) // gets a PCollection<KV<String, String>> >>>> >>>> >>>> >>>> >>>> >>>> .apply(Values.<String>*create*()) // PCollection<String> >>>> >>>> >>>> >>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) >>>> >>>> .withAllowedLateness(Duration.standardDays(1)) >>>> >>>> .accumulatingFiredPanes()) >>>> >>>> >>>> >>>> .apply(Count.<String>perElement()) >>>> >>>> >>>> >>>> >>>> >>>> // write PCollection<KV<String, Long>> to stream >>>> >>>> .apply(MyIO.write() >>>> >>>> .withStream(outputStream) >>>> >>>> .withConsumerConfig(config)); >>>> >>>> >>>> >>>> >>>> >>>> Without the window transform, we can read from the stream and write to >>>> it, however, I don’t see output after the Window transform. Could you >>>> please help pin down the issue? >>>> >>>> Thank you, >>>> >>>> Gaurav >>>> >>> >> >> -- >> Thanks, >> Praveen K Viswanathan >> >
