If I have a PCollection<KV<K,V>> where: within each K, the V elements have a logical total ordering to them, is it possible within Beam to traverse only a monotonically increasing subset of the elements?

It seems like it might be possible to do this using the State API, for example with something (untested) like this: https://goo.gl/CnoCHC -- I'm pasting the method here as well, but am not sure if the formatting will come out mangled on the mailing list:

@ProcessElement public void process(ProcessContext c,
                    @StateId(STATE_ID) ValueState<V> state)
{
   final V val = state.read();
   // if new value is greater than or equal to old final V newValue = 
c.element().getValue();
   if (val ==null ||m_comparator.compare(newValue, val) >=0)
   {
      state.write(newValue);
      c.outputWithTimestamp(m_callback.apply(c.element()),
            c.timestamp());
   }
}


A few questions:

* Is code executed within @ProcessElement of a DoFn using State API guaranteed to be "serialized" per-K and per-window (by "serialized" i mean that it will produce the same effect as if every execution of the @ProcessElement method for a given K and window had executed to completion before the next one started for that same K and window) * If yes, are there further any guarantees at all about the ordering of the output PCollection within a given K and window?


--
Wesley Tanaka
https://wtanaka.com/

Reply via email to