How to Use AsyncLookupFunction

2024-06-09 Thread Clemens Valiente
hi, how do I implement AsyncLookupFunctions correctly? I implemented a
AsyncLookupFunction, the eval function has the following signature:
https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/table/functions/AsyncLookupFunction.html
eval(CompletableFuture> future, Object... keys) I also
implemented the Provider:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/connector/source/lookup/AsyncLookupFunctionProvider.html
unfortunately there is no documentation on the AsyncLookupFunction and
AsyncLookupFunctionProvider so I feel like I quite in the dark here.
if I register my AsyncLookupFunction as UDF and try to call it with this
example snippet:

WITH tbl AS (SELECT 123 as id, '456' as pmntid, 'IDR' as cur) SELECT * from
tbl join Lateral Table(cashlessInfo(id, pmntid, cur))
I get the following error:
No match found for function signature cashlessInfo(, ,
)

I tried creating a DynamicTableSourceFactory and a Table that uses this one
similar to
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/#full-stack-example
but I am not sure what schema to use here?
Any help/advice in which direction to investigate further?

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law



ProtoBuf DataStream to Table conversion issues

2023-08-27 Thread Clemens Valiente
There's friction with using scala/java protobuf and trying to convert them
into a Flink Table from a DataStream[ProtobufObject].
Scenario:
Input is a DataStream[ProtobufObject] from a kafka topic that we read data
from and deserialise into Protobuf objects (scala case classes or
alternatively Java classes) using scalapb https://scalapb.github.io/

Goal: Given a topic name and a protobuf classname, we would like to
automatically generate a Flink Table for it.

Problems:

   - The Java Protobuf classes are not Pojos and therefore not recognised.
   They show up as a single RAW column when converted from
   streamTableEnv.fromDataStream()
   - scala protobuf is better, the only issue is with repeated fields. They
   are represented as Seq in scala, which does not map to a flink table type
   and shows up as RAW again (only java.util.List types show as proper arrays)
   - scalapb allows customising the collection type but the standard ones
   have the same issue:
   https://scalapb.github.io/docs/customizations/#custom-collection-types
   - I tried to implement a new collection type that both satisfies the
   collection type requirements from scalapb as well as that of a
   java.util.List. but ultimately failed to do so because of signature clashes
   - flink table api has a protobuf support
   
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/
   but it requires translating the entire protobuf structure manually to SQL
   type which is not feasible.

 Questions:
- are there plans to support scala Seq for Flink SQL Array Type? Would it
be straightforward for me to implement a custom typeinformation(?) to help
Flink Table API convert it correctly?

- Why is the Java protobuf class not recognised as Pojo? Is it possible to
add support for them?

- Why does a Flink `CREATE TABLE` from Protobuf require the entire table
column structure to be defined in SQL again? Shouldn't fields be inferred
automatically from the provided protobuf class?
- Are there other ways of solving this challenge that maybe someone has
already successfully used?

So far my workaround is to implement a custom .map() step to convert the pb
object into something readable by the Flink Table API. But that has to be
done manually for each individual topic and pb class which does not scale.

Would be very glad for any insights to any of those questions above, I have
been hitting my head against this several prolonged times over the past
year(s) :(

Thanks a lot

Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law



Re: EOFException when deserializing from RocksDB

2023-02-07 Thread Clemens Valiente
If I store the Java protobuf objects in the rocksdb instead of the scala
objects, I get this stacktrace:

2023-02-07 09:17:04,246 WARN  org.apache.flink.runtime.taskmanager.Task
   [] - KeyedProcess -> (Map -> Sink: signalSink, Map -> Flat
Map -> Sink: FeatureSink, Sink: logsink) (2/2)#0
(fa4aae8fb7d2a7a94eafb36fe5470851_6760a9723a5626620871f040128bad1b_1_0)
switched from RUNNING to FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: Error while adding data to
RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:109)
at
com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:69)
at
com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectProcessFunction.scala:18)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains
data from a previous serialize call. It has to be flushed or cleared at the
end of the serialize call.
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:358)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:180)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:168)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:107)
... 16 more

I do not touch the Kryo Serializer apart from the one
registerTypeWithKryoSerializer
call, and I only call the state.value() and update() once each in the
processElement()
method.

I thought these Stores are supposedly abstracted away safely enough that as
a user I wouldn't have to worry about the exact
Flush/Serialization/Deserialization logic but it seems that this
application breaks even though I am only using what I think is quite
innocent code?

On Fri, Feb 3, 2023 at 4:52 PM Clemens Valiente 
wrote:

>
> Hi, I have been struggling with this particular Exception for days and
> thought I'd ask for help here.
>
> I am using a KeyedProcessFunction with a
>
>   private lazy val state: ValueState[Feature] = {
> val stateDescriptor = new
> ValueStateDescriptor[Feature]("CollectFeatureProcessState",
> createTypeInformation[Feature])
> getRuntimeContext.getState(stateDescriptor)
>   }
>
>
> which is used in my process function as follows
>
>   override def processElement(value: Feature, ctx:
> KeyedProcessFunction[String, Feature, Feature]#Context, out:
> Collector[Feature]): Unit = {
> val current: Feature = state.value match {
>   case null => value
>   case exists => combine(value, exists)
> }
> if (checkForCompleteness(current)) {
>   out.collect(current)
>   state.clear()
> } else {
>   state.update(current)
> }
>   }
>
>
> Feature is a protobuf class that I registered with kryo as follows (using
> chill-protobuf)
>
> env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
> classOf[ProtobufSerializer])
>
> But I also got Exceptions with normal scala case classes wrapping this
> Feature class, and without the ProtobufSerializer using the standard slow
> Java Serializer.
> The exception occurs within the first minutes/seconds of starting the app
> and looks as follows:
>
> 2023-02-03 08:41:07,577 WARN  org.apache.flin

EOFException when deserializing from RocksDB

2023-02-03 Thread Clemens Valiente
Hi, I have been struggling with this particular Exception for days and
thought I'd ask for help here.

I am using a KeyedProcessFunction with a

  private lazy val state: ValueState[Feature] = {
val stateDescriptor = new
ValueStateDescriptor[Feature]("CollectFeatureProcessState",
createTypeInformation[Feature])
getRuntimeContext.getState(stateDescriptor)
  }


which is used in my process function as follows

  override def processElement(value: Feature, ctx:
KeyedProcessFunction[String, Feature, Feature]#Context, out:
Collector[Feature]): Unit = {
val current: Feature = state.value match {
  case null => value
  case exists => combine(value, exists)
}
if (checkForCompleteness(current)) {
  out.collect(current)
  state.clear()
} else {
  state.update(current)
}
  }


Feature is a protobuf class that I registered with kryo as follows (using
chill-protobuf)

env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
classOf[ProtobufSerializer])

But I also got Exceptions with normal scala case classes wrapping this
Feature class, and without the ProtobufSerializer using the standard slow
Java Serializer.
The exception occurs within the first minutes/seconds of starting the app
and looks as follows:

2023-02-03 08:41:07,577 WARN  org.apache.flink.runtime.taskmanager.Task
   [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map ->
Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
(1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
at com.grab.app.functions.stream.CollectFeatureProcessFunction
$.processElement(CollectFeatureProcessFunction.scala:17)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
... 16 more

The exception is thrown at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
which is this line:

val current: AcornHydraeventFeature = state.value match {


Did someone run into this before and/or can point me at the right direction
for further investigation?

Thanks a lot
Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal 

flink-table-api-scala-bridge sources

2022-10-25 Thread Clemens Valiente
Hi everyone

I noticed when going through the scala datastream/table api bridge in my
IDE I cannot see the source of the code. I believe it is because the
Sources are missing on maven:
https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-scala-bridge_2.12/1.15.2/

If you have a look at the -sources.jar you will see it doesn't actually
contain any sources. It would be very helpful to have these sources
published since they contain the API a lot of users will end up calling,
like StreamTableEnvironment.toChangelogStream.
Thanks a lot
Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law



Re: [External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-19 Thread Clemens Valiente
Hi Chesnay,
thanks a lot for the clarification.
We managed to resolve the collision, and isolated a problem to the metrics
themselves.

Using the REST API at /jobs//metrics?get=uptime
the response is [{"id":"uptime","value":"-1"}]
despite the job running and processing data for 5 days at that point. All
task,taskmanager, and jobmanager related metrics seem fine, only the job
metrics are incorrect. Basically all of these do not have correct metrics:

[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"lastCheckpointProcessedData"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]

Looking at the Gauge the only way it can return -1 is
when isTerminalState() is true which I don't think can be the case in a
running application.
Do you know where we can check on what went wrong?

Best Regards
Clemens


On Thu, Oct 14, 2021 at 8:55 PM Chesnay Schepler  wrote:

> I think you are misunderstanding a few things.
>
> a) when you include a variable in the scope format, then Flink fills that
> in *before* it reaches Datadog. If you set it to "flink.", then
> what we send to Datadog is "flink.myAwesomeJob".
> b) the exception you see is not coming from Datadog. They occur because,
> based on the configured scope formats, metrics from different jobs running
> in the same JobManager resolve to the same name (the standby jobmanger is
> irrelevant). Flink rejects these metrics, because if were to send these out
> you'd get funny results in Datadog because all jobs would try to report the
> same metric.
>
> In short, you need to include the job id or job name in the
> metrics.scope.jm.job scope formats.
>
> On 13/10/2021 06:39, Clemens Valiente wrote:
>
> Hi,
>
> we are using datadog as our metrics reporter as documented here:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#datadog
>
> our jobmanager scope is
> metrics.scope.jm: flink.jobmanager
> metrics.scope.jm.job: flink.jobmanager
> since datadog doesn't allow placeholder in metric names, we cannot include
> the  or  placeholder in the scope.
>
> This setup worked nicely on our standalone kubernetes application
> deployment without using HA.
> But when we set up HA, we lost checkpointing metrics in datadog, and see
> this warning in the jobmanager log:
>
> 2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'totalNumberOfCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numberOfInProgressCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numberOfCompletedCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numberOfFailedCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointSize'. Metric will not be reported.[flink, jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointDuration'. Metric will not be reported.[flink, jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
>

[External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-12 Thread Clemens Valiente
Hi,

we are using datadog as our metrics reporter as documented here:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#datadog

our jobmanager scope is
metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager
since datadog doesn't allow placeholder in metric names, we cannot include
the  or  placeholder in the scope.

This setup worked nicely on our standalone kubernetes application
deployment without using HA.
But when we set up HA, we lost checkpointing metrics in datadog, and see
this warning in the jobmanager log:

2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'totalNumberOfCheckpoints'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'numberOfInProgressCheckpoints'. Metric will not
be reported.[flink, jobmanager]
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'numberOfCompletedCheckpoints'. Metric will not
be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'numberOfFailedCheckpoints'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not
be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointSize'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointDuration'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointProcessedData'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointPersistedData'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointExternalPath'. Metric will not be
reported.[flink, jobmanager]


I assume this is because we now have two jobmanager pods (one active one
standby) and they both report this metric, it fails. but we cannot use the
 scope in the group, otherwise we won't be able to build datadog
dashboards conveniently.

My question:
- did anyone else encounter this problem?
- how could we solve this to have checkpointing metrics again in HA mode
without needing the  placeholder?

Thanks a lot
Clemens

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



[External] naming table stages

2021-07-27 Thread Clemens Valiente
Is it possible to rename execution stages from the Table API? Right now the
entire select transformation appears in plaintext in the task name so the
log entries from ExecutionGraph are over 10,000 characters long and the log
files are incredibly difficult to read.
for example a simple selected field shows up as
Calc(select=[(((_UTF-16LE'code = ' POSITION (FLAG(BOTH) TRIM _UTF-16LE' '
TRIM FLAG(BOTH) TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) =
_UTF-16LE'') OR ((FLAG(BOTH) TRIM _UTF-16LE' ' TRIM
extraInfo.loginRequestID) = _UTF-16LE'None')) CASE null:VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" CASE extraInfo.loginRequestID))) = 1) CASE
null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE FLAG(BOTH) TRIM
_UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'') OR ((FLAG(BOTH)
TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'None')) CASE
null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE
extraInfo.loginRequestID)) AS loginRequestId
and we have about a dozen of those, and they're all printed out for every
single log line.
Is there any way of shortening this without having to suppress these log
lines completely?

Best Regards
Clemens

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



Re: [External] NullPointerException on accumulator after Checkpointing

2021-07-27 Thread Clemens Valiente
Hi Timo,
thanks for the help here, wrapping the MapView in a case class indeed
solved the problem.
It was not immediately apparent from the documentation that using a MapView
as top level accumulator would cause an issue. it seemed a straightforward
intuitive way to use it :)

Cheers
Clemens

On Wed, Jul 14, 2021 at 10:19 PM Timo Walther  wrote:

> Hi Clemens,
>
> first of all can you try to use the MapView within an accumulator POJO
> class. This might solve your exception. I'm not sure if we support the
> views as top-level accumulators.
>
> In any case this seems to be a bug. I will open an issue once I get you
> feedback. We might simply throw an exception for top-level usage then.
>
> Regards,
> Timo
>
>
>
> On 14.07.21 06:33, Clemens Valiente wrote:
> > Hi,
> >
> > we created a new AggregateFunction with Accumulator as Mapview as follows
> >
> > class CountDistinctAggFunction[T] extends
> > AggregateFunction[lang.Integer, MapView[T, lang.Integer]] {
> >
> >override def createAccumulator(): MapView[T, lang.Integer] = {
> >  new MapView[T, lang.Integer]()
> >}
> > ...
> >
> > We had NullPointerExceptions happening on
> >
> > getValue(accumulator: MapView[T, lang.Integer]): lang.Integer
> >
> > and
> >
> > def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit
> = {
> >
> > so I added null checks there.
> >
> > Unfortunately the NPEs are still happening, right after triggering
> > checkpointing
> >
> > 2021-07-14 04:01:22,340 INFO
> >   org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> > Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
> > 0cbe21cce72742ec8e5
> > e6786aa6b44ca.
> > 2021-07-14 04:02:52,249 INFO
> >   org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> > OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC],
> > window=[ RANG BETWEEN
> > 360 PRECEDING AND CURRENT ROW],
> >
>  select=[] 
> app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
> > AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
> >   RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
> > java.lang.NullPointerException: null
> >  at
> >
>  
> org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
> > ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >  at BoundedOverAggregateHelper$946.setAccumulators(Unknown
> > Source) ~[?:?]
> >  at
> >
>  
> org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
> > ~[feature_hydra-assembly-master-
> > 25903391.jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> > ~[feature_hydra-assembly-master-25903391.jar:master-2590339
> > 1]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> > ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> > ~[feature_hydra-assembly-master-25903391.jar:master-
> > 25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> > ~[feature_hydra-assembly-master-25903391
> > .jar:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> > ~[feature_hydra-assembly-master-25903391.jar:master-2590
> > 3391]
> >  at
> >
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
> > ~[feature_hydra-assembly-master-25903391.ja
> > r:master-25903391]
> >  at
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> > ~[feature_hydra-asse
> > mbly-master-25903391.jar:master-25903391]
> >

[External] NullPointerException on accumulator after Checkpointing

2021-07-13 Thread Clemens Valiente
Hi,

we created a new AggregateFunction with Accumulator as Mapview as follows

class CountDistinctAggFunction[T] extends AggregateFunction[lang.Integer,
MapView[T, lang.Integer]] {

  override def createAccumulator(): MapView[T, lang.Integer] = {
new MapView[T, lang.Integer]()
  }
...

We had NullPointerExceptions happening on

getValue(accumulator: MapView[T, lang.Integer]): lang.Integer

and

def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit = {

so I added null checks there.

Unfortunately the NPEs are still happening, right after triggering
checkpointing

2021-07-14 04:01:22,340 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
0cbe21cce72742ec8e5
e6786aa6b44ca.
2021-07-14 04:02:52,249 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC], window=[
RANG BETWEEN
360 PRECEDING AND CURRENT ROW],
select=[] 
app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
 RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
java.lang.NullPointerException: null
at
org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at BoundedOverAggregateHelper$946.setAccumulators(Unknown Source)
~[?:?]
at
org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
~[feature_hydra-assembly-master-
25903391.jar:master-25903391]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
~[feature_hydra-assembly-master-25903391.jar:master-2590339
1]
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
~[feature_hydra-assembly-master-25903391.jar:master-
25903391]
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
~[feature_hydra-assembly-master-25903391
.jar:master-25903391]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
~[feature_hydra-assembly-master-25903391.jar:master-2590
3391]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
~[feature_hydra-assembly-master-25903391.ja
r:master-25903391]
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
~[feature_hydra-asse
mbly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
~[feature_hydra-assembly-master-25903391.jar:master-259
03391]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[feature_hydra-assembly-master-25903391.jar:master-25903391]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]

reading the stacktrace it looks like the accumulator once again is null
here.

I have no idea how the accumulator ends up null, I don't think this should
be happening. I didn't find