Re: Non-temporal watermarks

2023-02-03 Thread David Anderson
DataStream time windows and Flink SQL make assumptions about the timestamps
and watermarks being milliseconds since the epoch. But the underlying
machinery does not. So if you limit yourself to process functions (for
example), then nothing will assign any semantics to the time values.

David

On Thu, Feb 2, 2023 at 2:43 AM James Sandys-Lumsdaine 
wrote:

> I can describe a use that has been successful for me. We have a Flink
> workflow that calculates reports over many days and have it currently set
> up to recompute the last 10 days or so when recovering this "deep history"
> from our databases and then switches over to live flow to process all
> subsequent update events. I wrote this before the days of the HyrbidSource
> so it is literally a JDBC data source that queries state for the last 10
> days and that stream is merged with a "live" stream from a db poller or
> Kafka stream.
>
> In answer to your question, during recovery I have all state for the old
> business days sent with a timestamp of that business date e.g. new
> DateTime(2023, 1, 15, 0, 0, 0, UTC).getMillis() for any data associated
> with the 15th Jan 2023. Once the data source has emitted all the state
> for that date, it then emits a watermark with exactly the same timestamp as
> it is communicating downstream that all the data has been sent for that
> date. Then moves onto the next date emitting that state.
>
> When my system starts up it records the current datetime and treats all
> data retrieved before that timestamp as being recovered state, and all data
> receieved from the live pollers/Kafka to be after that cut-off point. The
> live sources emit objects timestamped with the current time and
> periodically emit a watermark to make forward progress. I'm simplifying
> here but you get the point.
>
> This pattern is useful for me because my keyed process functions are able
> to register timers to process all the data for an historic date at once -
> it won't need to fire on each message received or try to compute with
> missing data, but instead runs once all the data has been received for a
> date from all the sources. (The time is only triggered when the watermark
> is reached and that required all sources to have reached at least that
> point in the recovery). Once we have reached the startup datetime watermark
> the system seamlessly flips into live processing mode. The watermarks still
> trigger my timers but now we are processing the last ~1 minute of batched
> data.
>
> So logically the meaning of a timestamp and watermark in my system always
> represents a forward moving moment in time - it is just that it means an
> historic date for data during recovery from the databases and then a
> current timestamp when the system is processing live data.
>
> Hope that gives you some ideas and help.
>
> James.
> --
> *From:* Gen Luo 
> *Sent:* 02 February 2023 09:52
> *To:* Jan Lukavský 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Non-temporal watermarks
>
> Hi,
>
> This is an interesting topic. I suppose the watermark is defined based on
> the event time since it's mainly used, or designed, for the event time
> processing. Flink provides the event time processing mechanism because it's
> widely needed. Every event has its event time and we usually need to group
> or order by the event time. On the other hand, this also means that we can
> process events from different sources as the event time is naturally of the
> same scale.
>
> However, just as you say, technically speaking the event timestamp can be
> replaced with any other meaningful number (or event a comparable), and the
> (event time) watermark should change accordingly. If we promise this field
> and its watermark of all sources are of the same scale, we can process the
> data/event from the sources together with it just like the event time. As
> the event time processing and event time timer service doesn't rely on the
> actual time point or duration, I suppose this can be implemented by
> defining it as the event time, if it contains only positive numbers.
>
>
> On Thu, Feb 2, 2023 at 5:18 PM Jan Lukavský  wrote:
>
> Hi,
>
> I will not speak about details related to Flink specifically, the
> concept of watermarks is more abstract, so I'll leave implementation
> details aside.
>
> Speaking generally, yes, there is a set of requirements that must be met
> in order to be able to generate a system that uses watermarks.
>
> The primary question is what are watermarks used for? The answer is - we
> need watermarks to be able to define a partially stable order of
> _events_. Event is an immutable piece of data that can be _observed_
> (i.e. processed) with various consumer-dependent delays (two consumers
> of the event can see the event at different processing times), or a
> specific (local) timestamp. Generally an event tells us that something,
> somewhere happened at given local timestamp.
>
> Watermarks create markers in processing time of each o

Unsubscribe

2023-02-03 Thread Soumen Choudhury
-- 
Regards
Soumen Choudhury
Cell : +91865316168
mail to : sou@gmail.com


EOFException when deserializing from RocksDB

2023-02-03 Thread Clemens Valiente
Hi, I have been struggling with this particular Exception for days and
thought I'd ask for help here.

I am using a KeyedProcessFunction with a

  private lazy val state: ValueState[Feature] = {
val stateDescriptor = new
ValueStateDescriptor[Feature]("CollectFeatureProcessState",
createTypeInformation[Feature])
getRuntimeContext.getState(stateDescriptor)
  }


which is used in my process function as follows

  override def processElement(value: Feature, ctx:
KeyedProcessFunction[String, Feature, Feature]#Context, out:
Collector[Feature]): Unit = {
val current: Feature = state.value match {
  case null => value
  case exists => combine(value, exists)
}
if (checkForCompleteness(current)) {
  out.collect(current)
  state.clear()
} else {
  state.update(current)
}
  }


Feature is a protobuf class that I registered with kryo as follows (using
chill-protobuf)

env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
classOf[ProtobufSerializer])

But I also got Exceptions with normal scala case classes wrapping this
Feature class, and without the ProtobufSerializer using the standard slow
Java Serializer.
The exception occurs within the first minutes/seconds of starting the app
and looks as follows:

2023-02-03 08:41:07,577 WARN  org.apache.flink.runtime.taskmanager.Task
   [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map ->
Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
(1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
at com.grab.app.functions.stream.CollectFeatureProcessFunction
$.processElement(CollectFeatureProcessFunction.scala:17)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
... 16 more

The exception is thrown at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
which is this line:

val current: AcornHydraeventFeature = state.value match {


Did someone run into this before and/or can point me at the right direction
for further investigation?

Thanks a lot
Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal da