Failure to execute streaming SQL query

2020-11-05 Thread Satyam Shekhar
Hello,

I have a table T0 with the following schema -

root
  |-- amount: BIGINT
  |-- timestamp: TIMESTAMP(3)

The following two queries fail execution on the above table when executed
in streaming mode using the Blink planner.

WITH A AS (
  SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm
FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

WITH A AS (
  SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE)
as tm
FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

The two queries are very similar and only differ in their use of tumble_end
and tumble_rowtime operator. Both queries use the timestamp column as their
rowtime attribute. Casting "tm" column to timestamp makes both queries work
-

WITH A AS (
  SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE)
as TIMESTAMP(3)) as tm
FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

This workaround, however, loses the rowtime attribute from the output
resultset for the second query.

The first query fails with the following exception -

java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
class java.lang.Long (java.sql.Timestamp is in module java.sql of loader
'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SinkConversion$166.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$163.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lan

Re: I have some interesting result with my test code

2020-11-05 Thread Jark Wu
Great to hear it works!

`setStartFromGroupOffset` [1] will start reading partitions from the
consumer group’s (group.id setting in the consumer properties) committed
offsets in Kafka brokers. If offsets could not be found for a partition,
the 'auto.offset.reset' setting in the properties will be used. And the
default value of 'auto.offset.reset' property is latest [2].

I think that's why `setStartFromGroupOffset` doesn't consume all the events.

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
[2]: https://kafka.apache.org/documentation/#auto.offset.reset

On Fri, 6 Nov 2020 at 07:04, Kevin Kwon  wrote:

> Hi Jark, setStartFromEarliest actually worked. It's strange since my test
> is stateless (complete teardown of all docker containers) and the consumer
> creates the topic once it starts consuming a topic. I was assuming the
> setStartFromGroupOffset will let the consumer consume from the beginning
> anyways. I'll share the code if I have any further problems, since I can't
> just copy paste code created inside my company
>
> Thanks though! I appreciate your help
>
> On Thu, Nov 5, 2020 at 4:55 AM Jark Wu  wrote:
>
>> Hi Kevin,
>>
>> Could you share the code of how you register the FlinkKafkaConsumer as a
>> table?
>>
>> Regarding your initialization of FlinkKafkaConsumer, I would recommend to
>> setStartFromEarliest() to guarantee it consumes all the records in
>> partitions.
>>
>> Regarding the flush(), it seems it is in the foreach loop? So it is not 
>> flushing
>> after publishing ALL events?
>> I'm not experienced with the flush() API, could this method block and the
>> following random events can't be published to Kafka?
>>
>> Best,
>> Jark
>>
>> On Wed, 4 Nov 2020 at 04:04, Robert Metzger  wrote:
>>
>>> Hi Kevin,
>>> thanks a lot for posting this problem.
>>> I'm adding Jark to the thread, he or another committer working on Flink
>>> SQL can maybe provide some insights.
>>>
>>> On Tue, Nov 3, 2020 at 4:58 PM Kevin Kwon  wrote:
>>>
 Looks like the event time that I've specified in the consumer is not
 being respected. Does the timestamp assigner actually work in Kafka
 consumers?

   .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
 override def extractTimestamp(order: Order, recordTimestamp: 
 Long): Long = {
   order.getTimestamp
 }
   })


 On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon  wrote:

> Hi guys, I've been recently experimenting with end-to-end testing
> environment with Kafka and Flink (1.11)
>
> I've setup an infrastructure with Docker Compose composed of single
> Kafka broker / Flink (1.11) / MinIO for checkpoint saves
>
> Here's the test scenario
>
> 1. Send 1000 messages with manual timestamp assigned to each event
> increased by 100 milliseconds per loop (first message and last message has
> a difference of 100 seconds). There are 3 partitions for the topic I'm
> writing to. Below code is the test message producer using Confluent's
> Python SDK
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
> order_producer.produce(
> topic="order",
> key={"key": i % 100},
> value={
> "id": 1000,
> "customerId": i % 10,
> "timestamp": current_timestamp + i * 100
> }
> )
> order_producer.flush()
>
>
> 2. Flink performs an SQL query on this stream and publishes it back to
> Kafka topic that has 3 partitions. Below is the SQL code
>
> | SELECT
> |   o.id,
> |   COUNT(*),
> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
> | FROM
> |   order o
> | GROUP BY
> |   o.id,
> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>
> So I expect the sum of all the counts of the result to be equal to
> 1000 but it seems that a lot of messages are missing (797 as below). I
> can't seem to figure out why though. I'm using event time for the
> environment
>
> [image: Screenshot 2020-11-02 at 23.35.23.png]
>
> *Below is the configuration code*
> Here's the code for the consumer settings for Kafka
>
> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>   val properties = new Properties()
>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>   properties.setProperty("group.id", "awesome_order")
>
>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
> "order",
> ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>   classOf[Order],
>   kafkaSchemaRegistry
> ),
> properties
>   )
>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>   kafkaConsumer.setStartFromGroupOffsets()
>  

Rules of Thumb for Setting Parallelism

2020-11-05 Thread Rex Fenley
Hello,

I'm running a Job on AWS EMR with the TableAPI that does a long series of
Joins, GroupBys, and Aggregates and I'd like to know how to best tune
parallelism.

In my case, I have 8 EMR core nodes setup each with 4vCores and 8Gib of
memory. There's a job we have to run that has ~30 table operators. Given
this, how should I calculate what to set the systems parallelism to?

I also plan on running a second job on the same system, but just with 6
operators. Will this change the calculation for parallelism at all?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: union stream vs multiple operators

2020-11-05 Thread Chesnay Schepler

I don't think the first option has any benefit.

On 11/5/2020 1:19 AM, Alexey Trenikhun wrote:

Hello,
I have two Kafka topics ("A" and "B") which provide similar structure 
wise data but with different load pattern, for example hundreds 
records per second  in first topic while 10 records per second in 
second topic. Events processed using same algorithm and output in 
common sink, currently my pipeline is something like:


Source A->T-\
                       -> Sink
Source B->T-/

Instead of this pipeline I can union two streams and send them to 
common KeyedProcessFunction T:


Source A-\
                 (union)-> T -> Sink
Source B-/

What are pros and cons of these approaches?

Thanks,
Alexey





Re: Is possible that make two operators always locate in same taskmanager?

2020-11-05 Thread Chesnay Schepler

It would be good if you could elaborate a bit more on your use-case.
Are you using batch or streaming? What kind of "message" are we talking 
about? Why are you thinking of using a static variable, instead of just 
treating this message as part of the data(set/stream)?


On 11/5/2020 12:55 PM, Si-li Liu wrote:
Currently I use Flink 1.9.1. The actual thing I want to do is send 
some messages from downstream operators to upstream operators, which I 
consider use static variable.


But it makes me have to make sure in one taskmanager process it always 
has these two operators, can I use CoLocationGroup to solve this 
problem? Or can anyone give me an example to demostrate the usage 
of CoLocationGroup ?


Thanks!
--
Best regards

Sili Liu





Re: Flink TLS in K8s

2020-11-05 Thread Chesnay Schepler
It is unlikely that this is a port issue, and I would currently suspect 
that something in your SSL setup is not correct.


@Nico: do you have a suggestion on how to debug this?

On 11/5/2020 4:23 PM, Patrick Eifler wrote:

Hi,

I did set up a flink session cluster on K8s.

Now I added the ssl configuration as shown in the documentation:

# Flink TLS
security.ssl.internal.enabled: true
security.ssl.internal.keystore: 
/config/internal-keystore/internal.keystore.jks
security.ssl.internal.truststore: 
/config/internal-keystore/internal.keystore.jks

security.ssl.internal.keystore-password: {{ .Values.keystore.password }}
security.ssl.internal.truststore-password: {{ .Values.keystore.password }}
security.ssl.internal.key-password: {{ .Values.keystore.password }}

Now I get the problem that the task manager cannot connect to the job 
manager nor the resource manager:


could not resolve ResourceManager address 
akka.ssl.tcp://flink@flink-sc-jobmanager:6123/user/rpc/resourcemanager_*, 
retrying in 1 ms: Could not connect to rpc endpoint under address 
akka.ssl.tcp://flink@flink-sc-jobmanager:6123/user/rpc/resourcemanager_*.


Do I need to change the job manager port to make this work?

Any suggestions would be highly appreciated.

Thanks.

Patrick





Re: Long blocking call in UserFunction triggers HA leader lost?

2020-11-05 Thread Chesnay Schepler
I'd go with the network congestion theory for the time being; then the 
only remedy is throttling the download of said list, or somehow reducing 
the size of it significantly

.
What the task thread is doing doesn't matter in regards to HA; it may 
cause checkpoints to time out, but should have no other effects. (unless 
it magically consumes all CPU resources of system)
If you block in any function, then you're just blocking the task thread; 
nothing else.


On 11/5/2020 10:40 PM, Theo Diefenthal wrote:

Hi there,

I have a stream where I reload a huge list from time to time. I know 
there are various Side-Input patterns, but none of them seem to be 
perfect so I stuck with an easy approach: I use a Guava Cache and if 
it expires and a new element comes in, processing of the element is 
blocked up until the new list is loaded.


That approach runs in production for a while now and it works fine, as 
the cache has a mechanism to reload the list only on a real change. 
Now today, the list changed from a few hundred MB to multiple GB at a 
time where the network in general was a bit congested already. One 
TaskManager needed round about 4minutes to load the list, but after 
30seconds, it reported it lost connection to zookeeper and had thus no 
more information about the leading jobmanager, leading to a crashing 
loop. That crash & restart loop continued for 30minutes up until the 
list was rolled back and was then successfully loaded again.


Now my question:
* If processing of an element blocks, I understand that its also not 
possible to perform checkpoints at that time, but I didn't expect 
Zookeeper, Heartbeats or other threads of the taskmanager to timeout. 
Was that just a coincidence of the network being congested or is that 
something in the design of Flink that a long blocking call can lead to 
crashes? (Other than X checkpoints timed out and following a 
configured forced crash occured). Which threads can be blocked in 
Flink during a map in a MapFunction?
* For this approach with kind of a cached reload, should I switch to 
async IO or just put loading of the list in a background thread? In my 
case, it's not really important that processing is blocked up until 
the list is loaded. And in case of async IO: 99,999% of the events 
would directly return and would thus not be async, it's always just a 
single one triggering reload of the list, so it doesn't seem to be 
perfectly suited here?


Im running on Flink 1.11 and heres the relevant excerpt from the log:

2020-11-05T09:41:40.933865+01:00 [WARN] Client session timed out, have 
not heard from server in 33897ms for sessionid 0x374de97ba0afac9
2020-11-05T09:41:40.936488+01:00 [INFO] Client session timed out, have 
not heard from server in 33897ms for sessionid 0x374de97ba0afac9, 
closing socket connection and attempting reconnect

2020-11-05T09:41:41.042032+01:00 [INFO] State change: SUSPENDED
2020-11-05T09:41:41.168802+01:00 [WARN] Connection to ZooKeeper 
suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-05T09:41:41.169276+01:00 [WARN] Connection to ZooKeeper 
suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-05T09:41:41.169514+01:00 [INFO] Close ResourceManager 
connection e4d1f9acca4ea3c5a793877467218452.
2020-11-05T09:41:41.169514+01:00 [INFO] JobManager for job 
0dcfb212136daefbcbfe480c6a260261 with leader id 
8440610cd5de998bd6c65f3717de42b8 lost leadership.
2020-11-05T09:41:41.185104+01:00 [INFO] Close JobManager connection 
for job 0dcfb212136daefbcbfe480c6a260261.
2020-11-05T09:41:41.185354+01:00 [INFO] Attempting to fail task 
externally  (c596aafa324b152911cb53ab4e6d1cc2).
2020-11-05T09:41:41.187980+01:00 [WARN] ... 
(c596aafa324b152911cb53ab4e6d1cc2) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 
0dcfb212136daefbcbfe480c6a260261 lost the leadership.
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)

    at java.util.Optional.ifPresent(Optional.java:159)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFun

Long blocking call in UserFunction triggers HA leader lost?

2020-11-05 Thread Theo Diefenthal
Hi there, 

I have a stream where I reload a huge list from time to time. I know there are 
various Side-Input patterns, but none of them seem to be perfect so I stuck 
with an easy approach: I use a Guava Cache and if it expires and a new element 
comes in, processing of the element is blocked up until the new list is loaded. 

That approach runs in production for a while now and it works fine, as the 
cache has a mechanism to reload the list only on a real change. Now today, the 
list changed from a few hundred MB to multiple GB at a time where the network 
in general was a bit congested already. One TaskManager needed round about 
4minutes to load the list, but after 30seconds, it reported it lost connection 
to zookeeper and had thus no more information about the leading jobmanager, 
leading to a crashing loop. That crash & restart loop continued for 30minutes 
up until the list was rolled back and was then successfully loaded again. 

Now my question: 
* If processing of an element blocks, I understand that its also not possible 
to perform checkpoints at that time, but I didn't expect Zookeeper, Heartbeats 
or other threads of the taskmanager to timeout. Was that just a coincidence of 
the network being congested or is that something in the design of Flink that a 
long blocking call can lead to crashes? (Other than X checkpoints timed out and 
following a configured forced crash occured). Which threads can be blocked in 
Flink during a map in a MapFunction? 
* For this approach with kind of a cached reload, should I switch to async IO 
or just put loading of the list in a background thread? In my case, it's not 
really important that processing is blocked up until the list is loaded. And in 
case of async IO: 99,999% of the events would directly return and would thus 
not be async, it's always just a single one triggering reload of the list, so 
it doesn't seem to be perfectly suited here? 

Im running on Flink 1.11 and heres the relevant excerpt from the log: 

2020-11-05T09:41:40.933865+01:00 [WARN] Client session timed out, have not 
heard from server in 33897ms for sessionid 0x374de97ba0afac9 
2020-11-05T09:41:40.936488+01:00 [INFO] Client session timed out, have not 
heard from server in 33897ms for sessionid 0x374de97ba0afac9, closing socket 
connection and attempting reconnect 
2020-11-05T09:41:41.042032+01:00 [INFO] State change: SUSPENDED 
2020-11-05T09:41:41.168802+01:00 [WARN] Connection to ZooKeeper suspended. Can 
no longer retrieve the leader from ZooKeeper. 
2020-11-05T09:41:41.169276+01:00 [WARN] Connection to ZooKeeper suspended. Can 
no longer retrieve the leader from ZooKeeper. 
2020-11-05T09:41:41.169514+01:00 [INFO] Close ResourceManager connection 
e4d1f9acca4ea3c5a793877467218452. 
2020-11-05T09:41:41.169514+01:00 [INFO] JobManager for job 
0dcfb212136daefbcbfe480c6a260261 with leader id 
8440610cd5de998bd6c65f3717de42b8 lost leadership. 
2020-11-05T09:41:41.185104+01:00 [INFO] Close JobManager connection for job 
0dcfb212136daefbcbfe480c6a260261. 
2020-11-05T09:41:41.185354+01:00 [INFO] Attempting to fail task externally  
(c596aafa324b152911cb53ab4e6d1cc2). 
2020-11-05T09:41:41.187980+01:00 [WARN] ... (c596aafa324b152911cb53ab4e6d1cc2) 
switched from RUNNING to FAILED. 
org.apache.flink.util.FlinkException: JobManager responsible for 
0dcfb212136daefbcbfe480c6a260261 lost the leadership. 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)
 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)
 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
 
at java.util.Optional.ifPresent(Optional.java:159) 
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
at akka.dispatch.Mailbox.run(Mailbox.sc

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-11-05 Thread Flavio Pompermaier
Hi everybody,
I was trying to use the JobListener in my job but onJobExecuted() on Flink
1.11.0 but I can't understand if the job succeeded or not.
If I look at the Javadoc of the JobListener.onJobExecute() [1] says
"Callback on job execution finished, successfully or unsuccessfully"
but I can't find any simple way to infer if the job has finished
successfully or not.
Do I need to perform another remote call from the client to get the job
details using the job id?
I'm quite surprised that the execution result (FINISHED / CANCELED /
FAILED) in not available in the JobExecutionResult.
Another strange thing is that
the jobExecutionResult.getJobExecutionResult() returns itself..is it
correct?

Thanks in advance,
Flavio

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html

On Fri, Oct 9, 2020 at 1:09 PM Matthias  wrote:

> Reviving this thread again after I came across FLINK-12214 [1] since there
> are use cases which might benefit from this feature. Was there some
> conclusion on public APIs in the meantime? Should we proceed with the
> discussion here?
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-12214
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: Multi-stream SQL-like processing

2020-11-05 Thread Krzysztof Zarzycki
Yes,
kafka connect supports topics.regex option for Sink connectors. The
connector automatically discovers new topics which fit the regex pattern.
It's similar with source connectors, which discover tables in a SQL
database and save them to Kafka topics.


czw., 5 lis 2020 o 04:16 Jark Wu  napisał(a):

> Yes. The dynamism might be a problem.
> Does Kafka Connect support discovering new tables and synchronizing them
> dynamically?
>
> Best,
> Jark
>
> On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki 
> wrote:
>
>> Hi Jark, thanks for joining the discussion!
>> I understand your point of view that SQL environment is probably not the
>> best for what I was looking to achieve.
>> The idea of a configuration tool sounds almost perfect :) Almost ,
>> because:
>> Without the "StatementSet" that you mentioned at the end I would be
>> worried about resource consumption (job & task manager objects, buffers,
>> connections) of having one topology per table. That would be a significant
>> loss against architecture of Kafka Connect kind.
>> With StatementSet I understand this is not a case, but there is another
>> issue: We lose the dynamism. That is, the job won't be able to discover new
>> tables. We would need to always restart the whole (reconfigured)
>> StatementSet job. (Anyway, this approach sounds good enough to try it out
>> in my current assignment.)
>> The other issue I see is that I still need to define the DSL for the
>> configuration(sth like config of KConnect). SQL will not be it, it will
>> probably be barely a way to implement the tool.
>>
>> I would appreciate your comments, Jark.
>> Also if anyone would like to add other ideas, feel welcome!
>>
>> Best,
>> Krzysztof
>>
>> śr., 4 lis 2020 o 09:37 Jark Wu  napisał(a):
>>
>>> Hi Krzysztof,
>>>
>>> This is a very interesting idea.
>>>
>>> I think SQL is not a suitable tool for this use case, because SQL is a
>>> structured query language
>>>  where the table schema is fixed and never changes during job running.
>>>
>>> However, I think it can be a configuration tool project on top of Flink
>>> SQL.
>>> The configuration tool can dynamically generate all the queries
>>> according to the config
>>>  and submit them in one job.
>>>
>>> For example, if the configuration says "synchronize from mysql address
>>> '' to kafka broker ''",
>>> then the generated Flink SQL would like:
>>>
>>> CREATE TABLE db (
>>>   `database_name` STRING,
>>>   `table_name` STRING,
>>>   `data` BYTES  // encodes all the columns value, can be a better
>>> structure for performance
>>> ) WITH (
>>>   connector = ...   // a new connector scan all tables from the mysql
>>> address
>>>   url = 'jdbc:mysql://localhost:3306/flink-test'
>>> );
>>>
>>> // the configuration tool will generate multiple INSERT INTO according
>>> to how many tables in the DB
>>> INSERT INTO kafka_table1
>>> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
>>> schema from database
>>> FROM db WHERE table = 'table1'// or schema registry and
>>> deserialize the data into columns with different types.
>>>
>>> INSERT INTO kafka_table2
>>> SELECT parse_data(table_name, data)
>>> FROM db WHERE table = 'table2'
>>>
>>> ...
>>>
>>> The configuration tool can use `StatementSet` to package all the INSERT
>>> INTO queries together and submit them in one job.
>>> With the `StatementSet`, the job will share the common source task, so
>>> the tables in MySQL are only read once.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki 
>>> wrote:
>>>
 Hi community, I would like to confront one idea with you.

 I was thinking that Flink SQL could be a Flink's answer for Kafka
 Connect (more powerful, with advantages like being decoupled from Kafka).
 Flink SQL would be the configuration language for Flink "connectors",
 sounds great!.
 But one thing does not allow me to implement this idea: There is no
 possibility to run SQL-based processing over multiple similar inputs and
 produce multiple similar outputs (counted in tens or hundreds).
 As a problem example that I need to solve, consider that I have a
 hundred of Kafka topics, with similar data in each. And I would like to
 sink them to a SQL database. With Kafka connect, I can use a single
 connector with JDBC sink, that properly configured will dump each topic to
 a separate table properly keeping the schema (based on what is in the
 schema registry).
 With Flink SQL I would need to run a query per topic/table, I believe.
 Similarly with sourcing data. There is this cool project
 flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
 SQL database, but when used with SQL, it can only pull in one table per
 query.
 These cases can be solved using the datastream API. With it I can code
 pulling in/pushing out multiple table streams. But then "the configuration"
 is a

Re: Upsert UDFs

2020-11-05 Thread Rex Fenley
Also, just to be clear our ES connector looks like this:

CREATE TABLE sink_es_groups (
id BIGINT,
//.. a bunch of scalar fields
array_of_ids ARRAY,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${env:ELASTICSEARCH_HOSTS}',
'index' = '${env:GROUPS_ES_INDEX}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '512',
'sink.bulk-flush.max-size' = '1mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)


On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley  wrote:

> Hello,
>
> I'm using the Table API to do a bunch of stateful transformations on CDC
> Debezium rows and then insert final documents into Elasticsearch via the ES
> connector.
>
> I've noticed that Elasticsearch is constantly deleting and then inserting
> documents as they update. Ideally, there would be no delete operation for a
> row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
> which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
> which implies upserts are actually what it's capable of.
>
> Therefore, I think it's possibly my table plan that's causing row upserts
> to turn into deletes + inserts. My plan is essentially a series of Joins
> and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
> possibly the UDF Aggs following the Joins + GroupBys are causing the
> upserts to split into delete + inserts somehow. If this is correct, is it
> possible to make UDFs that preserve Upserts? Or am I totally off-base with
> my assumptions?
>
> Thanks!
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Upsert UDFs

2020-11-05 Thread Rex Fenley
Hello,

I'm using the Table API to do a bunch of stateful transformations on CDC
Debezium rows and then insert final documents into Elasticsearch via the ES
connector.

I've noticed that Elasticsearch is constantly deleting and then inserting
documents as they update. Ideally, there would be no delete operation for a
row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
which implies upserts are actually what it's capable of.

Therefore, I think it's possibly my table plan that's causing row upserts
to turn into deletes + inserts. My plan is essentially a series of Joins
and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
possibly the UDF Aggs following the Joins + GroupBys are causing the
upserts to split into delete + inserts somehow. If this is correct, is it
possible to make UDFs that preserve Upserts? Or am I totally off-base with
my assumptions?

Thanks!
-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



[Stateful Functions] Checkpoints and AtLeastOnce Guarantee

2020-11-05 Thread Jan Brusch


Hi Igal,

thanks for these pointers!

I currently deploy a flink jar per docker copy. But this is a spike 
setup anyway. I will now discard it and switch directly to working in 
kubernetes.


So, just so I understand this right, the recommended production setup 
would be:


* Build a docker image containing the job and custom flink-conf.yaml 
based on this: 
https://github.com/apache/flink-statefun/tree/master/tools/docker


* Deploy the job image to kubernetes per helm into a standalone StateFun 
Jobcluster: https://github.com/apache/flink-statefun/tree/master/tools/k8s


* Repeat the above steps for each StateFun Job separately


Two questions left:

1) Does the recommendation of one Cluster per Job in kubernetes setups 
also hold for "regular" Flink Jobs?


2) Do you (ververica) offer developer training with special focus on 
Stateful Functions? I would probably feel a bit safer moving into 
production with that as a background... :-)



Thanks again for your quick and comprehensive replies!

Best regards and a nice evening!

Jan

On 05.11.20 17:55, Igal Shilman wrote:

How do you deploy the job currently?
Are you using the data stream integration / or as a Flink Jar [1]

(also please note, that the directories might be created but without 
checkpoint interval set, they will be empty)


Regarding your two questions:

That is true that you can theoretically share the same cluster to 
submit additional jobs besides StateFun.
statefun requires a specific set of configurations, that might not 
apply for your other jobs.
Considering your end-goal of eventually using kubernetes, the 
recommended way is actually using a cluster per job, and StateFun 
docker images

are a convenient way to package your modules.

[1] 
https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#flink-jar 




On Thu, Nov 5, 2020 at 5:29 PM Jan Brusch > wrote:


Hi Igal,

thanks for your quick and detailed reply! For me, this is the
really great defining feature of Stateful Functions: Separating
StreamProcessing "Infrastructure" from Business Logic Code,
possibly maintained by a different team.

Regarding your points: I did add the checkpoint interval to the
flink-conf to to avail. state.checkpoint.dir was already set and
all the necessary subfolders get created on job startup. They just
stay empty...

Thanks for the pointer to the helm charts! Just what I was looking
for!

A question regarding StateFun docker images: I would actually
prefer using them but my fear is that they would take away the my
options to:

1) deploy a new release of my StateFun job without killing the
cluster, because...

2) ... I would like to schedule regular flink jobs or additional
StateFun jobs on the same cluster alongside my original job.

Could you give a quick opinion if these fears are even true and if
so, what would be a recommended setup to satisfy these use cases?


Best regards

Jan


On 05.11.20 17:02, Igal Shilman wrote:

Hi Jan,

The architecture outlined by you, sounds good and we've run
successfully mixed architectures like this.
Let me try to address your questions:

1)
To enable checkpointing you need to set the relevant values in
your flink-conf.yaml file.
execution.checkpointing.interval:  (see [1])
state.checkpoint.dir:  (see [2])

You can take a look here for an example [3]. The easiest way to
incorporate the changes would be to add your custom
flink-conf.yaml into your docker image (here is an example [4]).
When you will be using kubernetes, you can mount a config map as
a flink-conf.yaml, check out the helm charts here: [5]

2)
When the remote function is unavailable, StateFun would buffer
the messages addressed to it, upto the specified
timeout (default would be 1 minute, you can set it here [6])
before the job is considered to be failed and it would be restarted.
It seems like in your example you are waiting for 10 seconds, so
the messages should be delivered.
Do you set function.spec.timeout or .withMaxRequestDuration() to
something else?


Good luck!
Igal.

p.s,
Consider using StateFun docker images[7], see any of the examples
in the statefun repository.


[1]

https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval


[2]

https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir



[3]

https://github.com/apache/flink-statefu

Re: Filter By Value in List

2020-11-05 Thread Rex Fenley
Thanks Timo,

Checking if an element is in an Array does seem like a very useful function
to have. Is there any plan to add it?

Thanks

On Thu, Nov 5, 2020 at 7:26 AM Timo Walther  wrote:

> Hi Rex,
>
> as far as I know, the IN operator only works on tables or a list of
> literals where the latter one is just a shortcut for multiple OR
> operations. I would just go with a UDF for this case. In SQL you could
> do an UNNEST to convert the array into a table and then use the IN
> operator. But I'm not sure if this is a better solution.
>
> Regards,
> Timo
>
>
>
> On 04.11.20 01:13, Rex Fenley wrote:
> > None of the following appear to work either. Flink 1.11.2, Scala 2.12.
> >
> > table.filter("apple".in(List("apple")))
> > [info]   org.apache.flink.table.api.ValidationException: IN operator on
> > incompatible types: String and ObjectArrayTypeInfo.
> >
> > table.filter("apple".in(java.util.Arrays.asList("apple")))
> > [info]   org.apache.flink.table.api.ValidationException: IN operator on
> > incompatible types: String and ObjectArrayTypeInfo.
> >
> > table.filter(
> > "apple".in(newju.ArrayList[String](java.util.Arrays.asList("apple")))
> > )
> > [info]   org.apache.flink.table.api.ValidationException: IN operator on
> > incompatible types: String and ObjectArrayTypeInfo.
> >
> >
> > On Tue, Nov 3, 2020 at 2:32 PM Rex Fenley  > > wrote:
> >
> > Using a custom serializer to make sure I'm using a List does
> > not help.
> >
> > [info]   org.apache.flink.table.api.ValidationException: IN operator
> > on incompatible types: String and List.
> >
> > On Tue, Nov 3, 2020 at 12:44 PM Rex Fenley  > > wrote:
> >
> > For clarification, I'm using Pojo and operating on a column of
> > this type
> > publicjava.util.List fruits
> >
> > adding the following annotation does not help
> > @DataTypeHint("ARRAY")
> >
> > On Mon, Nov 2, 2020 at 7:02 AM Aljoscha Krettek
> > mailto:aljos...@apache.org>> wrote:
> >
> > I believe this is happening because the type system does not
> > recognize
> > that list of Strings as anything special but treats it as a
> > black-box type.
> >
> > @Timo: Would this work with the new type system?
> >
> > Best,
> > Aljoscha
> >
> > On 02.11.20 06:47, Rex Fenley wrote:
> >  > Hello,
> >  >
> >  > I'm trying to filter the rows of a table by whether or
> > not a value exists
> >  > in an array column of a table.
> >  > Simple example:
> >  > table.where("apple".in($"fruits"))
> >  >
> >  > In this example, each row has a "fruits" Array
> > column that could
> >  > have 1 or many fruit strings which may or may not be
> "apple".
> >  >
> >  > However, I keep receiving the following error when I do
> > something similar
> >  > to the example above:
> >  > "IN operator on incompatible types: String and
> > GenericType"
> >  >
> >  > Is there any way to accomplish this?
> >  >
> >  > Thanks!
> >  >
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com | BLOG
> >  | FOLLOW US
> >  | LIKE US
> > 
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com | BLOG
> >  | FOLLOW US
> >  | LIKE US
> > 
> >
> >
> >
> > --
> >
> > Rex Fenley|Software Engineer - Mobile and Backend
> >
> >
> > Remind.com | BLOG  |
> > FOLLOW US  | LIKE US
> > 
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



RE: a couple of memory questions

2020-11-05 Thread Colletta, Edward
Thanks you for the response.   We do see the heap actually shrink after 
starting new jobs.


From: Matthias Pohl 
Sent: Thursday, November 5, 2020 8:20 AM
To: Colletta, Edward 
Cc: user@flink.apache.org
Subject: Re: a couple of memory questions

This email is from an external source - exercise caution regarding links and 
attachments.

Hello Edward,
please find my answers within your message below:

On Wed, Nov 4, 2020 at 1:35 PM Colletta, Edward 
mailto:edward.colle...@fmr.com>> wrote:
Using Flink 1.9.2 with FsStateBackend, Session cluster.


  1.  Does heap state get cleaned up when a job is cancelled?

We have jobs that we run on a daily basis.  We start each morning and cancel 
each evening.  We noticed that the process size does not seem to shrink.  We 
are looking at the resident size of the process with ps and also the USED 
column for Heap on the taskmanager page of the flink dashboard.
There is no explicit cleanup happening on the Flink side. The heap should be 
cleaned up when GC kicks in.

  1.  How can I examine the usage of Flink Managed Memory?

 The configuration documentation seems to indicate this is used for batch jobs, 
and we are only using the Streaming API.   I reduced 
taskmanager.memory.fraction to 0.3, but I think this is still reserving too 
much memory to an area we will not be using.
Unfortunately, I don't know of any way to monitor the managed memory for Flink 
1.9.2 as is. We're going to introduce new metrics for managed memory [1], 
network memory [2] and metaspace [3] in the upcoming release of Flink 1.12.0. 
This should make it easier to monitor these memory pools.

I hope that helps a bit.
Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14406
[2] https://issues.apache.org/jira/browse/FLINK-14422
[3] https://issues.apache.org/jira/browse/FLINK-19617


Re: [Stateful Functions] Checkpoints and AtLeastOnce Guarantee

2020-11-05 Thread Igal Shilman
How do you deploy the job currently?
Are you using the data stream integration / or as a Flink Jar [1]

(also please note, that the directories might be created but without
checkpoint interval set, they will be empty)

Regarding your two questions:

That is true that you can theoretically share the same cluster to submit
additional jobs besides StateFun.
statefun requires a specific set of configurations, that might not apply
for your other jobs.
Considering your end-goal of eventually using kubernetes, the recommended
way is actually using a cluster per job, and StateFun docker images
are a convenient way to package your modules.

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#flink-jar


On Thu, Nov 5, 2020 at 5:29 PM Jan Brusch  wrote:

> Hi Igal,
>
> thanks for your quick and detailed reply! For me, this is the really great
> defining feature of Stateful Functions: Separating StreamProcessing
> "Infrastructure" from Business Logic Code, possibly maintained by a
> different team.
>
> Regarding your points: I did add the checkpoint interval to the flink-conf
> to to avail. state.checkpoint.dir was already set and all the necessary
> subfolders get created on job startup. They just stay empty...
>
> Thanks for the pointer to the helm charts! Just what I was looking for!
>
> A question regarding StateFun docker images: I would actually prefer using
> them but my fear is that they would take away the my options to:
>
> 1) deploy a new release of my StateFun job without killing the cluster,
> because...
>
> 2) ... I would like to schedule regular flink jobs or additional StateFun
> jobs on the same cluster alongside my original job.
>
> Could you give a quick opinion if these fears are even true and if so,
> what would be a recommended setup to satisfy these use cases?
>
>
> Best regards
>
> Jan
>
>
> On 05.11.20 17:02, Igal Shilman wrote:
>
> Hi Jan,
>
> The architecture outlined by you, sounds good and we've run successfully
> mixed architectures like this.
> Let me try to address your questions:
>
> 1)
> To enable checkpointing you need to set the relevant values in your
> flink-conf.yaml file.
> execution.checkpointing.interval:  (see [1])
> state.checkpoint.dir:  (see [2])
>
> You can take a look here for an example [3]. The easiest way to
> incorporate the changes would be to add your custom flink-conf.yaml into
> your docker image (here is an example [4]).
> When you will be using kubernetes, you can mount a config map as a
> flink-conf.yaml, check out the helm charts here: [5]
>
> 2)
> When the remote function is unavailable, StateFun would buffer the
> messages addressed to it, upto the specified
> timeout (default would be 1 minute, you can set it here [6]) before the
> job is considered to be failed and it would be restarted.
> It seems like in your example you are waiting for 10 seconds, so the
> messages should be delivered.
> Do you set function.spec.timeout or .withMaxRequestDuration() to something
> else?
>
>
> Good luck!
> Igal.
>
> p.s,
> Consider using StateFun docker images[7], see any of the examples in the
> statefun repository.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir
> [3]
> https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
> [4]
> https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20
> [5] https://github.com/apache/flink-statefun/tree/master/tools/k8s
> [6] look for function.spec.timeout at
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html
> [7]
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images
>
> On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch 
> wrote:
>
>> Hi,
>>
>> I'm currently trying to set up a Flink Stateful Functions Job with the
>> following architecture:
>>
>> * Kinesis Ingress (embedded)
>>
>> * Stateful Function (embedded) that calls to and takes responses from an
>> external business logic function (python worker similar to the one in
>> the python greeter example)
>>
>> * Kinesis Egress (embedded)
>>
>>
>> For the time being I am working with a local docker-compose cluster, but
>> the goal would be to move this to kubernetes for production. The stream
>> processing itself is working fine, but I can't solve two problems with
>> respect to Fault Tolerance:
>>
>> 1) The app is not writing checkpoints or savepoints at all (rocksDB,
>> local filesystem). A checkpoint dir is created on startup but stays
>> empty the whole time. When stopping the job, a savepoint dir is created
>> but the stop ultimately fails with a
>> java.util.concurrent.TimeoutException and the job continues to run.
>>
>> 2) When I

Re: [Stateful Functions] Checkpoints and AtLeastOnce Guarantee

2020-11-05 Thread Jan Brusch

Hi Igal,

thanks for your quick and detailed reply! For me, this is the really 
great defining feature of Stateful Functions: Separating 
StreamProcessing "Infrastructure" from Business Logic Code, possibly 
maintained by a different team.


Regarding your points: I did add the checkpoint interval to the 
flink-conf to to avail. state.checkpoint.dir was already set and all the 
necessary subfolders get created on job startup. They just stay empty...


Thanks for the pointer to the helm charts! Just what I was looking for!

A question regarding StateFun docker images: I would actually prefer 
using them but my fear is that they would take away the my options to:


1) deploy a new release of my StateFun job without killing the cluster, 
because...


2) ... I would like to schedule regular flink jobs or additional 
StateFun jobs on the same cluster alongside my original job.


Could you give a quick opinion if these fears are even true and if so, 
what would be a recommended setup to satisfy these use cases?



Best regards

Jan


On 05.11.20 17:02, Igal Shilman wrote:

Hi Jan,

The architecture outlined by you, sounds good and we've run 
successfully mixed architectures like this.

Let me try to address your questions:

1)
To enable checkpointing you need to set the relevant values in your 
flink-conf.yaml file.

execution.checkpointing.interval:  (see [1])
state.checkpoint.dir:  (see [2])

You can take a look here for an example [3]. The easiest way to 
incorporate the changes would be to add your custom flink-conf.yaml 
into your docker image (here is an example [4]).
When you will be using kubernetes, you can mount a config map as a 
flink-conf.yaml, check out the helm charts here: [5]


2)
When the remote function is unavailable, StateFun would buffer the 
messages addressed to it, upto the specified
timeout (default would be 1 minute, you can set it here [6]) before 
the job is considered to be failed and it would be restarted.
It seems like in your example you are waiting for 10 seconds, so the 
messages should be delivered.
Do you set function.spec.timeout or .withMaxRequestDuration() to 
something else?



Good luck!
Igal.

p.s,
Consider using StateFun docker images[7], see any of the examples in 
the statefun repository.



[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval 

[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir 
 

[3] 
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml 

[4] 
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20 

[5] https://github.com/apache/flink-statefun/tree/master/tools/k8s 

[6] look for function.spec.timeout at 
https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html 

[7] 
https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images 



On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch > wrote:


Hi,

I'm currently trying to set up a Flink Stateful Functions Job with
the
following architecture:

* Kinesis Ingress (embedded)

* Stateful Function (embedded) that calls to and takes responses
from an
external business logic function (python worker similar to the one in
the python greeter example)

* Kinesis Egress (embedded)


For the time being I am working with a local docker-compose
cluster, but
the goal would be to move this to kubernetes for production. The
stream
processing itself is working fine, but I can't solve two problems
with
respect to Fault Tolerance:

1) The app is not writing checkpoints or savepoints at all (rocksDB,
local filesystem). A checkpoint dir is created on startup but stays
empty the whole time. When stopping the job, a savepoint dir is
created
but the stop ultimately fails with a
java.util.concurrent.TimeoutException and the job continues to run.

2) When I try and simulate failure in the external Function
("docker-compose stop python-worker && sleep 10 && docker-compose
star

Re: [Stateful Functions] Checkpoints and AtLeastOnce Guarantee

2020-11-05 Thread Igal Shilman
Hi Jan,

The architecture outlined by you, sounds good and we've run successfully
mixed architectures like this.
Let me try to address your questions:

1)
To enable checkpointing you need to set the relevant values in your
flink-conf.yaml file.
execution.checkpointing.interval:  (see [1])
state.checkpoint.dir:  (see [2])

You can take a look here for an example [3]. The easiest way to incorporate
the changes would be to add your custom flink-conf.yaml into your docker
image (here is an example [4]).
When you will be using kubernetes, you can mount a config map as a
flink-conf.yaml, check out the helm charts here: [5]

2)
When the remote function is unavailable, StateFun would buffer the messages
addressed to it, upto the specified
timeout (default would be 1 minute, you can set it here [6]) before the job
is considered to be failed and it would be restarted.
It seems like in your example you are waiting for 10 seconds, so the
messages should be delivered.
Do you set function.spec.timeout or .withMaxRequestDuration() to something
else?


Good luck!
Igal.

p.s,
Consider using StateFun docker images[7], see any of the examples in the
statefun repository.


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir
[3]
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
[4]
https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20
[5] https://github.com/apache/flink-statefun/tree/master/tools/k8s
[6] look for function.spec.timeout at
https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html
[7]
https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images

On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch  wrote:

> Hi,
>
> I'm currently trying to set up a Flink Stateful Functions Job with the
> following architecture:
>
> * Kinesis Ingress (embedded)
>
> * Stateful Function (embedded) that calls to and takes responses from an
> external business logic function (python worker similar to the one in
> the python greeter example)
>
> * Kinesis Egress (embedded)
>
>
> For the time being I am working with a local docker-compose cluster, but
> the goal would be to move this to kubernetes for production. The stream
> processing itself is working fine, but I can't solve two problems with
> respect to Fault Tolerance:
>
> 1) The app is not writing checkpoints or savepoints at all (rocksDB,
> local filesystem). A checkpoint dir is created on startup but stays
> empty the whole time. When stopping the job, a savepoint dir is created
> but the stop ultimately fails with a
> java.util.concurrent.TimeoutException and the job continues to run.
>
> 2) When I try and simulate failure in the external Function
> ("docker-compose stop python-worker && sleep 10 && docker-compose start
> python-worker"), I lose all messages in between restarts. Although, the
> documentation states that "For both state and messaging, Stateful
> Functions is able to provide the exactly-once guarantees users expect
> from a modern data processing framework".
>
> See the relevant parts of my configs below.
>
> Any input or help would be greatly appreciated.
>
>
> Best regards
>
> Jan
>
>
> --
>
> flink-conf.yaml
>
> ---
>
> jobmanager.rpc.address: jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 1728m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> state.backend: rocksdb
> state.backend.rocksdb.timer-service.factory: ROCKSDB
> state.checkpoints.dir: file:///checkpoint-dir
> state.savepoints.dir: file:///checkpoint-dir
> jobmanager.execution.failover-strategy: region
> blob.server.port: 6124
> query.server.port: 6125
> classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
> 
>
> docker-compose.yaml
>
> ---
>
>jobmanager:
>  image: flink:1.11.2-scala_2.12-java8
>  expose:
>- "6123"
>  ports:
>- "8082:8081"
>  volumes:
>- ./streamProcessor/checkpoint-dir:/checkpoint-dir
>-
> ./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
>  command: jobmanager
>  environment:
>- JOB_MANAGER_RPC_ADDRESS=jobmanager
>- "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
>taskmanager:
>  image: flink:1.11.2-scala_2.12-java8
>  expose:
>- "6121"
>- "6122"
>  depends_on:
>- jobmanager
>  command: taskmanager
>  links:
>- "jobmanager:jobmanager"
>  environment:
>- JOB_MANAGER_RPC_ADDRESS=jobmanager
>- "FLINK_PROPERTIES

Re: Filter By Value in List

2020-11-05 Thread Timo Walther

Hi Rex,

as far as I know, the IN operator only works on tables or a list of 
literals where the latter one is just a shortcut for multiple OR 
operations. I would just go with a UDF for this case. In SQL you could 
do an UNNEST to convert the array into a table and then use the IN 
operator. But I'm not sure if this is a better solution.


Regards,
Timo



On 04.11.20 01:13, Rex Fenley wrote:

None of the following appear to work either. Flink 1.11.2, Scala 2.12.

table.filter("apple".in(List("apple")))
[info]   org.apache.flink.table.api.ValidationException: IN operator on 
incompatible types: String and ObjectArrayTypeInfo.


table.filter("apple".in(java.util.Arrays.asList("apple")))
[info]   org.apache.flink.table.api.ValidationException: IN operator on 
incompatible types: String and ObjectArrayTypeInfo.


table.filter(
"apple".in(newju.ArrayList[String](java.util.Arrays.asList("apple")))
)
[info]   org.apache.flink.table.api.ValidationException: IN operator on 
incompatible types: String and ObjectArrayTypeInfo.



On Tue, Nov 3, 2020 at 2:32 PM Rex Fenley > wrote:


Using a custom serializer to make sure I'm using a List does
not help.

[info]   org.apache.flink.table.api.ValidationException: IN operator
on incompatible types: String and List.

On Tue, Nov 3, 2020 at 12:44 PM Rex Fenley mailto:r...@remind101.com>> wrote:

For clarification, I'm using Pojo and operating on a column of
this type
publicjava.util.List fruits

adding the following annotation does not help
@DataTypeHint("ARRAY")

On Mon, Nov 2, 2020 at 7:02 AM Aljoscha Krettek
mailto:aljos...@apache.org>> wrote:

I believe this is happening because the type system does not
recognize
that list of Strings as anything special but treats it as a
black-box type.

@Timo: Would this work with the new type system?

Best,
Aljoscha

On 02.11.20 06:47, Rex Fenley wrote:
 > Hello,
 >
 > I'm trying to filter the rows of a table by whether or
not a value exists
 > in an array column of a table.
 > Simple example:
 > table.where("apple".in($"fruits"))
 >
 > In this example, each row has a "fruits" Array
column that could
 > have 1 or many fruit strings which may or may not be "apple".
 >
 > However, I keep receiving the following error when I do
something similar
 > to the example above:
 > "IN operator on incompatible types: String and
GenericType"
 >
 > Is there any way to accomplish this?
 >
 > Thanks!
 >



-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US




-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US




--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Flink TLS in K8s

2020-11-05 Thread Patrick Eifler
Hi,

I did set up a flink session cluster on K8s. 

Now I added the ssl configuration as shown in the documentation:

# Flink TLS
security.ssl.internal.enabled: true
security.ssl.internal.keystore: /config/internal-keystore/internal.keystore.jks
security.ssl.internal.truststore: 
/config/internal-keystore/internal.keystore.jks
security.ssl.internal.keystore-password: {{ .Values.keystore.password }}
security.ssl.internal.truststore-password: {{ .Values.keystore.password }}
security.ssl.internal.key-password: {{ .Values.keystore.password }}

Now I get the problem that the task manager cannot connect to the job manager 
nor the resource manager:

could not resolve ResourceManager address 
akka.ssl.tcp://flink@flink-sc-jobmanager:6123/user/rpc/resourcemanager_*, 
retrying in 1 ms: Could not connect to rpc endpoint under address 
akka.ssl.tcp://flink@flink-sc-jobmanager:6123/user/rpc/resourcemanager_*.

Do I need to change the job manager port to make this work?

Any suggestions would be highly appreciated.

Thanks.

Patrick

Re: Flink job percentage

2020-11-05 Thread Chesnay Schepler
No, because that would break the API and any log-parsing infrastructure 
relying on it.


On 11/5/2020 2:56 PM, Flavio Pompermaier wrote:
Just another question: should I open a JIRA to rename 
ExecutionState.CANCELING to CANCELLING (indeed the enum's Javadoc 
report CANCELLING)?


On Thu, Nov 5, 2020 at 11:31 AM Chesnay Schepler > wrote:


|The "mismatch" is due to you mixing job and vertex states.
|

|These are the states a job can be in (based on
org.apache.flink.api.common.JobStatus):|

|[ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING",
"CANCELED", "FINISHED", "RESTARTING", "SUSPENDED",
"RECONCILING" ]||
|

|These are the states a vertex can be in (based on
org.apache.flink.runtime.execution.ExecutionState):|

|[ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED",
"CANCELING", "CANCELED", "FAILED", "RECONCILING" ]|

|Naturally, for your code you only want to check for the lattern.
|

|The documentation is hence correct. FYI, we directly access the
corresponding enums to generate this list, so it _cannot_ be
out-of-sync.|



On 11/5/2020 11:16 AM, Flavio Pompermaier wrote:

What do you thinkin about this very rough heuristic (obviously it
makes sense only for batch jobs)?
It's far from perfect but at least it gives an idea of something
going on..
PS: I found some mismatch from the states documented in [1] and
the ones I found in the ExecutionState enum..
[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid

    Map statusCount =
jobDetails.getJobVerticesPerState();
    int uncompleted =
statusCount.getOrDefault(ExecutionState.CREATED, 0) + //
statusCount.getOrDefault(ExecutionState.RUNNING, 0) + ///
statusCount.getOrDefault(ExecutionState.CANCELING, 0) + //
statusCount.getOrDefault(ExecutionState.DEPLOYING, 0) + //
        // statusCount.getOrDefault(ExecutionState.FAILING,0)+ //
not found in Flink 1.11.0
        // statusCount.getOrDefault(ExecutionState.SUSPENDED,0)+
/// not found in Flink 1.11.0
statusCount.getOrDefault(ExecutionState.RECONCILING, 0) + //
        // statusCount.getOrDefault(ExecutionState.RESTARTING,0)
+ /// not found in Flink 1.11.0
statusCount.getOrDefault(ExecutionState.RUNNING, 0) + //
statusCount.getOrDefault(ExecutionState.SCHEDULED, 0);
    int completed =
statusCount.getOrDefault(ExecutionState.FINISHED, 0) + //
statusCount.getOrDefault(ExecutionState.FAILED, 0) + //
statusCount.getOrDefault(ExecutionState.CANCELED, 0);
    final Integer completionPercentage = Math.floorDiv(completed,
completed + uncompleted);

Thanks in advance,
Flavio

On Thu, Aug 13, 2020 at 4:17 PM Arvid Heise mailto:ar...@ververica.com>> wrote:

Hi Flavio,

This is a daunting task to implement properly. There is an
easy fix in related workflow systems though. Assuming that
it's a rerunning task, then you simply store the run times of
the last run, use some kind of low-pass filter (=decaying
average) and compare the current runtime with the expected
runtime. Even if Flink would have some estimation, it's
probably not more accurate than this.

Best,

Arvid

On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi Flavio,

I'm not aware of such a heuristic being implemented
anywhere. You need to come up with something yourself.

On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hi to all,
one of our customers asked us to see a percentage of
completion of a Flink Batch job. Is there any already
implemented heuristic I can use to compute it? Will
this be possible also when DataSet api will migrate
to DataStream..?

Thanks in advance,
Flavio



-- 


Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache
FlinkConference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB
158244 BManaging Directors: Timothy Alexander Steinert, Yip
Park Tung Jason, Ji (Toni) Cheng







[Stateful Functions] Checkpoints and AtLeastOnce Guarantee

2020-11-05 Thread Jan Brusch

Hi,

I'm currently trying to set up a Flink Stateful Functions Job with the 
following architecture:


* Kinesis Ingress (embedded)

* Stateful Function (embedded) that calls to and takes responses from an 
external business logic function (python worker similar to the one in 
the python greeter example)


* Kinesis Egress (embedded)


For the time being I am working with a local docker-compose cluster, but 
the goal would be to move this to kubernetes for production. The stream 
processing itself is working fine, but I can't solve two problems with 
respect to Fault Tolerance:


1) The app is not writing checkpoints or savepoints at all (rocksDB, 
local filesystem). A checkpoint dir is created on startup but stays 
empty the whole time. When stopping the job, a savepoint dir is created 
but the stop ultimately fails with a 
java.util.concurrent.TimeoutException and the job continues to run.


2) When I try and simulate failure in the external Function 
("docker-compose stop python-worker && sleep 10 && docker-compose start 
python-worker"), I lose all messages in between restarts. Although, the 
documentation states that "For both state and messaging, Stateful 
Functions is able to provide the exactly-once guarantees users expect 
from a modern data processing framework".


See the relevant parts of my configs below.

Any input or help would be greatly appreciated.


Best regards

Jan


--

flink-conf.yaml

---

jobmanager.rpc.address: jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.checkpoints.dir: file:///checkpoint-dir
state.savepoints.dir: file:///checkpoint-dir
jobmanager.execution.failover-strategy: region
blob.server.port: 6124
query.server.port: 6125
classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf





docker-compose.yaml

---

  jobmanager:
    image: flink:1.11.2-scala_2.12-java8
    expose:
  - "6123"
    ports:
  - "8082:8081"
    volumes:
  - ./streamProcessor/checkpoint-dir:/checkpoint-dir
  - 
./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro

    command: jobmanager
    environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager
  - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"

  taskmanager:
    image: flink:1.11.2-scala_2.12-java8
    expose:
  - "6121"
  - "6122"
    depends_on:
  - jobmanager
    command: taskmanager
    links:
  - "jobmanager:jobmanager"
    environment:
  - JOB_MANAGER_RPC_ADDRESS=jobmanager
  - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: 
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"




Re: Best way to test Table API and SQL

2020-11-05 Thread Timo Walther

Hi,

everything prefixed with `org.apache.flink.table.planner` is Blink 
planner. So you should be able to use those testing classes. The Blink 
planner is also the default one since 1.11. In general, I would 
recommend to look a bit into the testing package. There are many 
different testing examples.


Regards,
Timo


On 31.10.20 00:34, Rex Fenley wrote:

Hello,

Thank you for these examples, they look great. However, I can seem to import
`import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, 
StringSink}`

is it because I'm using the Blink planner and not the regular one?

Thanks

On Fri, Oct 9, 2020 at 7:55 AM Timo Walther > wrote:


Hi Rex,

let me copy paste my answer from a similar thread 2 months ago:

Hi,

this might be helpful as well:


https://lists.apache.org/thread.html/rfe3b45a10fc58cf19d2f71c6841515eb7175ba731d5055b06f236b3f%40%3Cuser.flink.apache.org%3E

First of all, it is important to know if you are interested in
end-to-end tests (incl. connectors) or excluding connectors. If you
just
like to test your operators, you can use a lot of the testing
infrastructure of Flink.

If your are NOT using event-time, you can simply use
`TableEnvironment.fromValues()` and `Table.execute().collect()`. This
test uses it for example [1] (it is one of the newer test generations).

Otherwise you can use or implement your own testing connectors, like in

org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase#testStructuredScalarFunction

[2].

I hope this helps.

Regards,
Timo

[1]

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/expressions/MathFunctionsITCase.java

[2]

https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java#L700


Let me know if you need more information.

Regards,
Timo

On 09.10.20 07:39, Rex Fenley wrote:
 > Hello
 >
 > I'd like to write a unit test for my Flink Job. It consists
mostly of
 > the Table API and SQL using a StreamExecutionEnvironment with the
blink
 > planner, from source to sink.
 > What's the best approach for testing Table API/SQL?
 >
 > I read
 >

https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html

 > however that seems to cover more for specialized functions with
 > DataStreams vs entire Table API constructs. What I think I'd like
is to
 > be able to have some stubbed input sources and mocked out sinks
which I
 > use to test against my Tables.
 >
 > Does this seem reasonable?
 >
 > I did find TestStreamEnvironment and maybe that would be useful
at least
 > for running the tests locally it seems?
 >

https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/TestStreamEnvironment.html
 >
 > Any help appreciated. Thanks!
 >
 > --
 >
 > Rex Fenley|Software Engineer - Mobile and Backend
 >
 >
 > Remind.com | BLOG
 |
 > FOLLOW US  | LIKE US
 > 
 >



--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: Flink job percentage

2020-11-05 Thread Flavio Pompermaier
Just another question: should I open a JIRA to rename
ExecutionState.CANCELING to CANCELLING (indeed the enum's Javadoc report
CANCELLING)?

On Thu, Nov 5, 2020 at 11:31 AM Chesnay Schepler  wrote:

> The "mismatch" is due to you mixing job and vertex states.
>
> These are the states a job can be in (based on
> org.apache.flink.api.common.JobStatus):
>
> [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED",
> "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
>
> These are the states a vertex can be in (based on
> org.apache.flink.runtime.execution.ExecutionState):
>
> [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING",
> "CANCELED", "FAILED", "RECONCILING" ]
>
> Naturally, for your code you only want to check for the lattern.
>
> The documentation is hence correct. FYI, we directly access the
> corresponding enums to generate this list, so it _cannot_ be out-of-sync.
>
> On 11/5/2020 11:16 AM, Flavio Pompermaier wrote:
>
> What do you thinkin about this very rough heuristic (obviously it makes
> sense only for batch jobs)?
> It's far from perfect but at least it gives an idea of something going on..
> PS: I found some mismatch from the states documented in [1] and the ones I
> found in the ExecutionState enum..
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid
>
> Map statusCount =
> jobDetails.getJobVerticesPerState();
> int uncompleted = statusCount.getOrDefault(ExecutionState.CREATED, 0)
> + //
> statusCount.getOrDefault(ExecutionState.RUNNING, 0) + ///
> statusCount.getOrDefault(ExecutionState.CANCELING, 0) + //
> statusCount.getOrDefault(ExecutionState.DEPLOYING, 0) + //
> // statusCount.getOrDefault(ExecutionState.FAILING,0)+ // not
> found in Flink 1.11.0
> // statusCount.getOrDefault(ExecutionState.SUSPENDED,0)+ /// not
> found in Flink 1.11.0
> statusCount.getOrDefault(ExecutionState.RECONCILING, 0) + //
> // statusCount.getOrDefault(ExecutionState.RESTARTING,0) + /// not
> found in Flink 1.11.0
> statusCount.getOrDefault(ExecutionState.RUNNING, 0) + //
> statusCount.getOrDefault(ExecutionState.SCHEDULED, 0);
> int completed = statusCount.getOrDefault(ExecutionState.FINISHED, 0) +
> //
> statusCount.getOrDefault(ExecutionState.FAILED, 0) + //
> statusCount.getOrDefault(ExecutionState.CANCELED, 0);
> final Integer completionPercentage = Math.floorDiv(completed,
> completed + uncompleted);
>
> Thanks in advance,
> Flavio
>
> On Thu, Aug 13, 2020 at 4:17 PM Arvid Heise  wrote:
>
>> Hi Flavio,
>>
>> This is a daunting task to implement properly. There is an easy fix in
>> related workflow systems though. Assuming that it's a rerunning task, then
>> you simply store the run times of the last run, use some kind of low-pass
>> filter (=decaying average) and compare the current runtime with the
>> expected runtime. Even if Flink would have some estimation, it's probably
>> not more accurate than this.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I'm not aware of such a heuristic being implemented anywhere. You need
>>> to come up with something yourself.
>>>
>>> On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier 
>>> wrote:
>>>
 Hi to all,
 one of our customers asked us to see a percentage of completion of a
 Flink Batch job. Is there any already implemented heuristic I can use to
 compute it? Will this be possible also when DataSet api will migrate to
 DataStream..?

 Thanks in advance,
 Flavio

>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B 
>> Managing
>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>
>>
>
>


Re: a couple of memory questions

2020-11-05 Thread Matthias Pohl
Hello Edward,
please find my answers within your message below:

On Wed, Nov 4, 2020 at 1:35 PM Colletta, Edward 
wrote:

> Using Flink 1.9.2 with FsStateBackend, Session cluster.
>
>
>
>1. Does heap state get cleaned up when a job is cancelled?
>
> We have jobs that we run on a daily basis.  We start each morning and
> cancel each evening.  We noticed that the process size does not seem to
> shrink.  We are looking at the resident size of the process with ps and
> also the USED column for Heap on the taskmanager page of the flink
> dashboard.
>
There is no explicit cleanup happening on the Flink side. The heap should
be cleaned up when GC kicks in.

>
>1. How can I examine the usage of Flink Managed Memory?
>
>  The configuration documentation seems to indicate this is used for batch
> jobs, and we are only using the Streaming API.   I reduced 
> taskmanager.memory.fraction
> to 0.3, but I think this is still reserving too much memory to an area we
> will not be using.
>
Unfortunately, I don't know of any way to monitor the managed memory for
Flink 1.9.2 as is. We're going to introduce new metrics for managed memory
[1], network memory [2] and metaspace [3] in the upcoming release of Flink
1.12.0. This should make it easier to monitor these memory pools.

I hope that helps a bit.
Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14406
[2] https://issues.apache.org/jira/browse/FLINK-14422
[3] https://issues.apache.org/jira/browse/FLINK-19617


Re:Re: JM upload files to blob server is slow

2020-11-05 Thread forideal
Hi Arvid Heise,
 Thank you for your reply.
 Yes,my connection to the JM is bad !!!


Best wishes,forideal




At 2020-11-04 15:32:38, "Arvid Heise"  wrote:

A jar upload shouldn't take minutes. There are two possibilities that likely 
co-occured:
- your jar is much bigger than needed. Did you make sure that you don't put 
Flink into the fatjar? That's counterproductive on many levels. Please check 
the jar size.

- your connection to the JM is bad. Where is your Flink cluster running? If 
connection is superslow, it's often better to build the jar on the respective 
server and submit on command line or through REST.



On Wed, Nov 4, 2020 at 7:15 AM forideal  wrote:

Hello my friend:
My line of code runs very slowly. What are the possibilities?
 code:
CompletableFuture jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
  final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
   try {
  ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new 
BlobClient(address, configuration));
   } catch (FlinkException e) {
  throw new CompletionException(e);
   }
   return jobGraph;
});
code 
link:https://github.com/apache/flink/blob/release-1.10.0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L93


Through jstack, we can also find that the thread 
Flink-DispatcherRestEndpoint-thread-1 has been running for 5 minutes.
"Flink-DispatcherRestEndpoint-thread-1" #82 daemon prio=5 os_prio=0 
tid=0x7f590c03c800 nid=0x179 runnable [0x7f5a34165000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at 
org.apache.flink.runtime.blob.BlobOutputStream.write(BlobOutputStream.java:88)
at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:65)
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:368)
at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:428)
at 
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:102)
at 
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:95)
at 
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:77)
at 
org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:57)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(RedJarRunHandler.java:92)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$770/941040494.apply(Unknown
 Source)
at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
at 
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Thank you very much for your reply, forideal




 



--


Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng   

Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-05 Thread Yuval Itzchakov
Yes, Calcite uses apiguardian. To answer your question Aljoscha, no, I do
not use it directly.

It's a dependency of the shaded Calcite version inside the blink JAR.

On Thu, Nov 5, 2020 at 11:02 AM Timo Walther  wrote:

> Hi Yuval,
>
> this error is indeed weird.
>
> @Aljoscha: I think Calcite uses apiguardian.
>
> When I saw the initial error, it looked like there are different Apache
> Calcite versions in the classpath. I'm wondering if this is a pure SBT
> issue because I'm sure that other users would have reported this error
> earlier.
>
> Regards,
> Timo
>
>
> On 02.11.20 16:00, Aljoscha Krettek wrote:
> > But you're not using apiguardian yourself or have it as a dependency
> > before this, right?
> >
> > Best,
> > Aljoscha
> >
> > On 02.11.20 14:59, Yuval Itzchakov wrote:
> >> Yes, I'm using SBT.
> >>
> >> I managed to resolve this by adding:
> >>
> >> "org.apiguardian" % "apiguardian-api" % "1.1.0"
> >>
> >> To the dependency list. Perhaps this depedency needs to be shaded as
> well
> >> in flink-core?
> >>
> >> My SBT looks roughly like this:
> >>
> >>lazy val flinkVersion = "1.11.2"
> >>libraryDependencies ++= Seq(
> >>  "org.apache.flink"%% "flink-table-planner-blink"
> >>   % flinkVersion,
> >>  "org.apache.flink"%% "flink-table-runtime-blink"
> >>   % flinkVersion,
> >>  "org.apache.flink"%% "flink-table-api-scala-bridge"
> >> % flinkVersion,
> >>  "org.apache.flink" % "flink-s3-fs-hadoop"
> >> % flinkVersion,
> >>  "org.apache.flink"%% "flink-container"
> >>   % flinkVersion,
> >>  "org.apache.flink"%% "flink-connector-kafka"
> >>   % flinkVersion,
> >>  "org.apache.flink" % "flink-connector-base"
> >> % flinkVersion,
> >>  "org.apache.flink" % "flink-table-common"
> >> % flinkVersion,
> >>  "org.apache.flink"%% "flink-cep"
> >>   % flinkVersion,
> >>  "org.apache.flink"%% "flink-scala"
> >>   % flinkVersion % "provided",
> >>  "org.apache.flink"%% "flink-streaming-scala"
> >>   % flinkVersion % "provided",
> >>  "org.apache.flink" % "flink-json"
> >> % flinkVersion % "provided",
> >>  "org.apache.flink" % "flink-avro"
> >> % flinkVersion % "provided",
> >>  "org.apache.flink"%% "flink-parquet"
> >>   % flinkVersion % "provided",
> >>  "org.apache.flink"%% "flink-runtime-web"
> >>   % flinkVersion % "provided",
> >>  "org.apache.flink"%% "flink-runtime"
> >>   % flinkVersion % "test" classifier "tests",
> >>  "org.apache.flink"%% "flink-streaming-java"
> >> % flinkVersion % "test" classifier "tests",
> >>  "org.apache.flink"%% "flink-test-utils"
> >> % flinkVersion % "test",
> >>)
> >>
> >> On Mon, Nov 2, 2020 at 3:21 PM Aljoscha Krettek 
> >> wrote:
> >>
> >>> @Timo and/or @Jark, have you seen this problem before?
> >>>
> >>> @Yuval, I'm assuming you're using sbt as a build system, is that
> >>> correct? Could you maybe also post a snippet of your build file that
> >>> shows the dependency setup or maybe the whole file(s).
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> On 01.11.20 13:34, Yuval Itzchakov wrote:
>  Hi,
> 
>  While trying to compile an application with a dependency on
>  flink-table-planner_blink_2.12-1.11.2, I receive the following error
>  message during compilation:
> 
>  scalac: While parsing annotations in
> /Library/Caches/Coursier/v1/https/
> 
> >>>
> repo1.maven.org/maven2/org/apache/flink/flink-table-planner-blink_2.12/1.11.2/flink-table-planner-blink_2.12-1.11.2.jar(org/apache/calcite/sql/SqlKind.class)
> >>>
> >>> ,
>  could not find EXPERIMENTAL in enum .
>  This is likely due to an implementation restriction: an annotation
> >>> argument
>  cannot refer to a member of the annotated class (scala/bug#7014).
> 
>  Has anyone encountered this issue?
> 
> >>>
> >>>
> >>
> >
>
>

-- 
Best Regards,
Yuval Itzchakov.


Is possible that make two operators always locate in same taskmanager?

2020-11-05 Thread Si-li Liu
Currently I use Flink 1.9.1. The actual thing I want to do is send some
messages from downstream operators to upstream operators, which I consider
use static variable.

But it makes me have to make sure in one taskmanager process it always has
these two operators, can I use CoLocationGroup to solve this problem? Or
can anyone give me an example to demostrate the usage of CoLocationGroup ?

Thanks!
-- 
Best regards

Sili Liu


Re: Flink job percentage

2020-11-05 Thread Flavio Pompermaier
Ok I understood. Unfortunately the documentation is not able to extract the
Map type of status-count that is  Map and I
thought that the job status and execution status were equivalent.
And what about the heuristic...? Could it make sense

On Thu, Nov 5, 2020 at 11:33 AM Chesnay Schepler  wrote:

> Admittedly, it can be out-of-sync if someone forgets to regenerate the
> documentation, but they cannot be mixed up.
>
> On 11/5/2020 11:31 AM, Chesnay Schepler wrote:
>
> The "mismatch" is due to you mixing job and vertex states.
>
> These are the states a job can be in (based on
> org.apache.flink.api.common.JobStatus):
>
> [ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED",
> "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
>
> These are the states a vertex can be in (based on
> org.apache.flink.runtime.execution.ExecutionState):
>
> [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING",
> "CANCELED", "FAILED", "RECONCILING" ]
>
> Naturally, for your code you only want to check for the lattern.
>
> The documentation is hence correct. FYI, we directly access the
> corresponding enums to generate this list, so it _cannot_ be out-of-sync.
>
> On 11/5/2020 11:16 AM, Flavio Pompermaier wrote:
>
> What do you thinkin about this very rough heuristic (obviously it makes
> sense only for batch jobs)?
> It's far from perfect but at least it gives an idea of something going on..
> PS: I found some mismatch from the states documented in [1] and the ones I
> found in the ExecutionState enum..
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid
>
> Map statusCount =
> jobDetails.getJobVerticesPerState();
> int uncompleted = statusCount.getOrDefault(ExecutionState.CREATED, 0)
> + //
> statusCount.getOrDefault(ExecutionState.RUNNING, 0) + ///
> statusCount.getOrDefault(ExecutionState.CANCELING, 0) + //
> statusCount.getOrDefault(ExecutionState.DEPLOYING, 0) + //
> // statusCount.getOrDefault(ExecutionState.FAILING,0)+ // not
> found in Flink 1.11.0
> // statusCount.getOrDefault(ExecutionState.SUSPENDED,0)+ /// not
> found in Flink 1.11.0
> statusCount.getOrDefault(ExecutionState.RECONCILING, 0) + //
> // statusCount.getOrDefault(ExecutionState.RESTARTING,0) + /// not
> found in Flink 1.11.0
> statusCount.getOrDefault(ExecutionState.RUNNING, 0) + //
> statusCount.getOrDefault(ExecutionState.SCHEDULED, 0);
> int completed = statusCount.getOrDefault(ExecutionState.FINISHED, 0) +
> //
> statusCount.getOrDefault(ExecutionState.FAILED, 0) + //
> statusCount.getOrDefault(ExecutionState.CANCELED, 0);
> final Integer completionPercentage = Math.floorDiv(completed,
> completed + uncompleted);
>
> Thanks in advance,
> Flavio
>
> On Thu, Aug 13, 2020 at 4:17 PM Arvid Heise  wrote:
>
>> Hi Flavio,
>>
>> This is a daunting task to implement properly. There is an easy fix in
>> related workflow systems though. Assuming that it's a rerunning task, then
>> you simply store the run times of the last run, use some kind of low-pass
>> filter (=decaying average) and compare the current runtime with the
>> expected runtime. Even if Flink would have some estimation, it's probably
>> not more accurate than this.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I'm not aware of such a heuristic being implemented anywhere. You need
>>> to come up with something yourself.
>>>
>>> On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier 
>>> wrote:
>>>
 Hi to all,
 one of our customers asked us to see a percentage of completion of a
 Flink Batch job. Is there any already implemented heuristic I can use to
 compute it? Will this be possible also when DataSet api will migrate to
 DataStream..?

 Thanks in advance,
 Flavio

>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B 
>> Managing
>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng
>>
>>
>
>
>


Re: Flink job percentage

2020-11-05 Thread Chesnay Schepler
Admittedly, it can be out-of-sync if someone forgets to regenerate the 
documentation, but they cannot be mixed up.


On 11/5/2020 11:31 AM, Chesnay Schepler wrote:

|The "mismatch" is due to you mixing job and vertex states.
|

|These are the states a job can be in (based on 
org.apache.flink.api.common.JobStatus):|


|[ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING",
"CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]||
|

|These are the states a vertex can be in (based on 
org.apache.flink.runtime.execution.ExecutionState):|


|[ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED",
"CANCELING", "CANCELED", "FAILED", "RECONCILING" ]|

|Naturally, for your code you only want to check for the lattern.
|

|The documentation is hence correct. FYI, we directly access the 
corresponding enums to generate this list, so it _cannot_ be out-of-sync.|




On 11/5/2020 11:16 AM, Flavio Pompermaier wrote:
What do you thinkin about this very rough heuristic (obviously it 
makes sense only for batch jobs)?
It's far from perfect but at least it gives an idea of something 
going on..
PS: I found some mismatch from the states documented in [1] and the 
ones I found in the ExecutionState enum..
[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid


    Map statusCount = 
jobDetails.getJobVerticesPerState();
    int uncompleted = 
statusCount.getOrDefault(ExecutionState.CREATED, 0) + //

statusCount.getOrDefault(ExecutionState.RUNNING, 0) + ///
statusCount.getOrDefault(ExecutionState.CANCELING, 0) + //
statusCount.getOrDefault(ExecutionState.DEPLOYING, 0) + //
        // statusCount.getOrDefault(ExecutionState.FAILING,0)+ // not 
found in Flink 1.11.0
        // statusCount.getOrDefault(ExecutionState.SUSPENDED,0)+ /// 
not found in Flink 1.11.0

statusCount.getOrDefault(ExecutionState.RECONCILING, 0) + //
        // statusCount.getOrDefault(ExecutionState.RESTARTING,0) + 
/// not found in Flink 1.11.0

statusCount.getOrDefault(ExecutionState.RUNNING, 0) + //
statusCount.getOrDefault(ExecutionState.SCHEDULED, 0);
    int completed = statusCount.getOrDefault(ExecutionState.FINISHED, 
0) + //

statusCount.getOrDefault(ExecutionState.FAILED, 0) + //
statusCount.getOrDefault(ExecutionState.CANCELED, 0);
    final Integer completionPercentage = Math.floorDiv(completed, 
completed + uncompleted);


Thanks in advance,
Flavio

On Thu, Aug 13, 2020 at 4:17 PM Arvid Heise > wrote:


Hi Flavio,

This is a daunting task to implement properly. There is an easy
fix in related workflow systems though. Assuming that it's a
rerunning task, then you simply store the run times of the last
run, use some kind of low-pass filter (=decaying average) and
compare the current runtime with the expected runtime. Even if
Flink would have some estimation, it's probably not more accurate
than this.

Best,

Arvid

On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi Flavio,

I'm not aware of such a heuristic being implemented anywhere.
You need to come up with something yourself.

On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hi to all,
one of our customers asked us to see a percentage of
completion of a Flink Batch job. Is there any already
implemented heuristic I can use to compute it? Will this
be possible also when DataSet api will migrate to
DataStream..?

Thanks in advance,
Flavio



-- 


Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache
FlinkConference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB
158244 BManaging Directors: Timothy Alexander Steinert, Yip Park
Tung Jason, Ji (Toni) Cheng







Re: Flink job percentage

2020-11-05 Thread Chesnay Schepler

|The "mismatch" is due to you mixing job and vertex states.
|

|These are the states a job can be in (based on 
org.apache.flink.api.common.JobStatus):|


   |[ "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING",
   "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]||
   |

|These are the states a vertex can be in (based on 
org.apache.flink.runtime.execution.ExecutionState):|


   |[ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED",
   "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]|

|Naturally, for your code you only want to check for the lattern.
|

|The documentation is hence correct. FYI, we directly access the 
corresponding enums to generate this list, so it _cannot_ be out-of-sync.|




On 11/5/2020 11:16 AM, Flavio Pompermaier wrote:
What do you thinkin about this very rough heuristic (obviously it 
makes sense only for batch jobs)?
It's far from perfect but at least it gives an idea of something going 
on..
PS: I found some mismatch from the states documented in [1] and the 
ones I found in the ExecutionState enum..
[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid


    Map statusCount = 
jobDetails.getJobVerticesPerState();
    int uncompleted = statusCount.getOrDefault(ExecutionState.CREATED, 
0) + //

statusCount.getOrDefault(ExecutionState.RUNNING, 0) + ///
statusCount.getOrDefault(ExecutionState.CANCELING, 0) + //
statusCount.getOrDefault(ExecutionState.DEPLOYING, 0) + //
        // statusCount.getOrDefault(ExecutionState.FAILING,0)+ // not 
found in Flink 1.11.0
        // statusCount.getOrDefault(ExecutionState.SUSPENDED,0)+ /// 
not found in Flink 1.11.0

statusCount.getOrDefault(ExecutionState.RECONCILING, 0) + //
        // statusCount.getOrDefault(ExecutionState.RESTARTING,0) + /// 
not found in Flink 1.11.0

statusCount.getOrDefault(ExecutionState.RUNNING, 0) + //
statusCount.getOrDefault(ExecutionState.SCHEDULED, 0);
    int completed = statusCount.getOrDefault(ExecutionState.FINISHED, 
0) + //

statusCount.getOrDefault(ExecutionState.FAILED, 0) + //
statusCount.getOrDefault(ExecutionState.CANCELED, 0);
    final Integer completionPercentage = Math.floorDiv(completed, 
completed + uncompleted);


Thanks in advance,
Flavio

On Thu, Aug 13, 2020 at 4:17 PM Arvid Heise > wrote:


Hi Flavio,

This is a daunting task to implement properly. There is an easy
fix in related workflow systems though. Assuming that it's a
rerunning task, then you simply store the run times of the last
run, use some kind of low-pass filter (=decaying average) and
compare the current runtime with the expected runtime. Even if
Flink would have some estimation, it's probably not more accurate
than this.

Best,

Arvid

On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi Flavio,

I'm not aware of such a heuristic being implemented anywhere.
You need to come up with something yourself.

On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hi to all,
one of our customers asked us to see a percentage of
completion of a Flink Batch job. Is there any already
implemented heuristic I can use to compute it? Will this
be possible also when DataSet api will migrate to
DataStream..?

Thanks in advance,
Flavio



-- 


Arvid Heise| Senior Java Developer




Follow us @VervericaData

--

Join Flink Forward - The Apache
FlinkConference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
BManaging Directors: Timothy Alexander Steinert, Yip Park Tung
Jason, Ji (Toni) Cheng





Re: Flink job percentage

2020-11-05 Thread Flavio Pompermaier
What do you thinkin about this very rough heuristic (obviously it makes
sense only for batch jobs)?
It's far from perfect but at least it gives an idea of something going on..
PS: I found some mismatch from the states documented in [1] and the ones I
found in the ExecutionState enum..
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid

Map statusCount =
jobDetails.getJobVerticesPerState();
int uncompleted = statusCount.getOrDefault(ExecutionState.CREATED, 0) +
//
statusCount.getOrDefault(ExecutionState.RUNNING, 0) + ///
statusCount.getOrDefault(ExecutionState.CANCELING, 0) + //
statusCount.getOrDefault(ExecutionState.DEPLOYING, 0) + //
// statusCount.getOrDefault(ExecutionState.FAILING,0)+ // not found
in Flink 1.11.0
// statusCount.getOrDefault(ExecutionState.SUSPENDED,0)+ /// not
found in Flink 1.11.0
statusCount.getOrDefault(ExecutionState.RECONCILING, 0) + //
// statusCount.getOrDefault(ExecutionState.RESTARTING,0) + /// not
found in Flink 1.11.0
statusCount.getOrDefault(ExecutionState.RUNNING, 0) + //
statusCount.getOrDefault(ExecutionState.SCHEDULED, 0);
int completed = statusCount.getOrDefault(ExecutionState.FINISHED, 0) +
//
statusCount.getOrDefault(ExecutionState.FAILED, 0) + //
statusCount.getOrDefault(ExecutionState.CANCELED, 0);
final Integer completionPercentage = Math.floorDiv(completed, completed
+ uncompleted);

Thanks in advance,
Flavio

On Thu, Aug 13, 2020 at 4:17 PM Arvid Heise  wrote:

> Hi Flavio,
>
> This is a daunting task to implement properly. There is an easy fix in
> related workflow systems though. Assuming that it's a rerunning task, then
> you simply store the run times of the last run, use some kind of low-pass
> filter (=decaying average) and compare the current runtime with the
> expected runtime. Even if Flink would have some estimation, it's probably
> not more accurate than this.
>
> Best,
>
> Arvid
>
> On Tue, Aug 11, 2020 at 10:26 AM Robert Metzger 
> wrote:
>
>> Hi Flavio,
>>
>> I'm not aware of such a heuristic being implemented anywhere. You need to
>> come up with something yourself.
>>
>> On Fri, Aug 7, 2020 at 12:55 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> one of our customers asked us to see a percentage of completion of a
>>> Flink Batch job. Is there any already implemented heuristic I can use to
>>> compute it? Will this be possible also when DataSet api will migrate to
>>> DataStream..?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-05 Thread Timo Walther
It was planned for 1.12 but didn't make it. 1.13 should fully implement 
FLIP-136. I just created issues to monitor the progress:


https://issues.apache.org/jira/browse/FLINK-19976

Regards,
Timo

On 04.11.20 18:43, Rex Fenley wrote:

Thank you for the info!

Is there a timetable for when the next version with this change might 
release?


On Wed, Nov 4, 2020 at 2:44 AM Timo Walther > wrote:


Hi Rex,

sorry for the late reply. POJOs will have much better support in the
upcoming Flink versions because they have been fully integrated with
the
new table type system mentioned in FLIP-37 [1] (e.g. support for
immutable POJOs and nested DataTypeHints etc).

For queries, scalar, and table functions you can already use the full
POJOs within the table ecosystem.

However, the only missing piece is the new translation of POJOs from
Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
then I would recommend to either use `Row` as the output of the table
API or try to use a scalar function before that maps to the desired
data
structure.

I hope this helps a bit.

Regards,
Timo

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[2]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 02.11.20 21:44, Rex Fenley wrote:
 > My jobs normally use the blink planner, I noticed with this test
that
 > may not be the case.
 >
 > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley mailto:r...@remind101.com>
 > >> wrote:
 >
 >     Flink 1.11.2 with Scala 2.12
 >
 >     Error:
 >     [info] JobScalaTest:
 >     [info] - dummy *** FAILED ***
 >     [info]   org.apache.flink.table.api.ValidationException:
Field types
 >     of query result and registered TableSink  do not match.
 >     [info] Query schema: [user: BIGINT, product: ROW<`name`
 >     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
 >     [info] Sink schema: [user: BIGINT, product:
 >     LEGACY('STRUCTURED_TYPE', 'ANY >   
  rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAECAAB4cQB-ABZ1cQB-A

Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-05 Thread Timo Walther

Hi Yuval,

this error is indeed weird.

@Aljoscha: I think Calcite uses apiguardian.

When I saw the initial error, it looked like there are different Apache 
Calcite versions in the classpath. I'm wondering if this is a pure SBT 
issue because I'm sure that other users would have reported this error 
earlier.


Regards,
Timo


On 02.11.20 16:00, Aljoscha Krettek wrote:
But you're not using apiguardian yourself or have it as a dependency 
before this, right?


Best,
Aljoscha

On 02.11.20 14:59, Yuval Itzchakov wrote:

Yes, I'm using SBT.

I managed to resolve this by adding:

"org.apiguardian" % "apiguardian-api" % "1.1.0"

To the dependency list. Perhaps this depedency needs to be shaded as well
in flink-core?

My SBT looks roughly like this:

   lazy val flinkVersion = "1.11.2"
   libraryDependencies ++= Seq(
 "org.apache.flink"    %% "flink-table-planner-blink"
  % flinkVersion,
 "org.apache.flink"    %% "flink-table-runtime-blink"
  % flinkVersion,
 "org.apache.flink"    %% "flink-table-api-scala-bridge"
% flinkVersion,
 "org.apache.flink" % "flink-s3-fs-hadoop"
% flinkVersion,
 "org.apache.flink"    %% "flink-container"
  % flinkVersion,
 "org.apache.flink"    %% "flink-connector-kafka"
  % flinkVersion,
 "org.apache.flink" % "flink-connector-base"
% flinkVersion,
 "org.apache.flink" % "flink-table-common"
% flinkVersion,
 "org.apache.flink"    %% "flink-cep"
  % flinkVersion,
 "org.apache.flink"    %% "flink-scala"
  % flinkVersion % "provided",
 "org.apache.flink"    %% "flink-streaming-scala"
  % flinkVersion % "provided",
 "org.apache.flink" % "flink-json"
% flinkVersion % "provided",
 "org.apache.flink" % "flink-avro"
% flinkVersion % "provided",
 "org.apache.flink"    %% "flink-parquet"
  % flinkVersion % "provided",
 "org.apache.flink"    %% "flink-runtime-web"
  % flinkVersion % "provided",
 "org.apache.flink"    %% "flink-runtime"
  % flinkVersion % "test" classifier "tests",
 "org.apache.flink"    %% "flink-streaming-java"
% flinkVersion % "test" classifier "tests",
 "org.apache.flink"    %% "flink-test-utils"
% flinkVersion % "test",
   )

On Mon, Nov 2, 2020 at 3:21 PM Aljoscha Krettek  
wrote:



@Timo and/or @Jark, have you seen this problem before?

@Yuval, I'm assuming you're using sbt as a build system, is that
correct? Could you maybe also post a snippet of your build file that
shows the dependency setup or maybe the whole file(s).

Best,
Aljoscha

On 01.11.20 13:34, Yuval Itzchakov wrote:

Hi,

While trying to compile an application with a dependency on
flink-table-planner_blink_2.12-1.11.2, I receive the following error
message during compilation:

scalac: While parsing annotations in /Library/Caches/Coursier/v1/https/

repo1.maven.org/maven2/org/apache/flink/flink-table-planner-blink_2.12/1.11.2/flink-table-planner-blink_2.12-1.11.2.jar(org/apache/calcite/sql/SqlKind.class) 


,

could not find EXPERIMENTAL in enum .
This is likely due to an implementation restriction: an annotation

argument

cannot refer to a member of the annotated class (scala/bug#7014).

Has anyone encountered this issue?












Re: A question about flink sql retreact stream

2020-11-05 Thread Jark Wu
Yes.

There is also a Flink Forward session [1] (since 14:00) talked about
the internals of the underlying changelog mechanism with a visual example.

Best,
Jark

[1]:
https://www.youtube.com/watch?v=KDD8e4GE12w&list=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7&index=48&t=820s

On Thu, 5 Nov 2020 at 15:48, Henry Dai  wrote:

> Hi Jark,
>
> Thanks for your reply, it helps me a lot!
>
> I have tested the mini-batch optimization, the result shows it reduces a
> lot of records produced when a Flink Table is converted to a Retracted
> DataStream.
>
> It seems I got wrong understanding about Flink's "Dynamic Table" concept
> in the past: if a record R1 is coming to sql computation, before it's
> processed, the Result Table's data view is V1, and after R1 is processed,
> Result Table's data view turn to V2. I used to believe the ChangeLog is
> simply Diff(V2, V1).
>
> Actually, there are a lot of intermediate changes during processing R1.
>
> Thanks!
>
>
>
> Jark Wu  于2020年11月5日周四 上午11:36写道:
>
>> Thanks Henry for the detailed example,
>>
>> I will explain why so many records at time 5.
>> That is because the retraction mechanism is per-record triggered in Flink
>> SQL, so there is record amplification in your case.
>> At time 5, the LAST_VALUE aggregation for stream a will first emit -(1,
>> 12345, 0) and then +(1, 12345, 0).
>> When the -(1, 12345, 0) arrives at the join operator, it will join the
>> previous 3 records in stream b, and then send 3 retraction messages.
>> When the 3 retraction messages arrive at the sum aggregation, it
>> produces (F 33)(T 21)(F 21)(T 10)(F 10).
>> In contrast, when the +(1, 12345, 0) arrives the join operator, it sends
>> 3 joined accumulation messages to sum aggregation, and produces (T 12)(F
>> 12)(T 23)(F 23)(T 33) .
>>
>> In Flink SQL, the mini-batch [1] optimization can reduce
>> this amplification, because it is triggered in a min-batch of records.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>
>>
>> On Wed, 4 Nov 2020 at 23:01, Henry Dai  wrote:
>>
>>>
>>> Dear flink developers&users
>>>
>>> I have a question about flink sql, It gives me a lot of
>>> trouble, Thank you very much for some help.
>>>
>>> Lets's assume we have two data stream, `order` and `order_detail`,
>>> they are from mysql binlog.
>>>
>>> Table `order` schema:
>>> id  int primary key
>>> order_idint
>>> statusint
>>>
>>> Table `order_detail` schema:
>>> id   int primary key
>>> order_id int
>>> quantity  int
>>>
>>> order : order_detail = 1:N, they are joined by `order_id`
>>>
>>> think we have following data sequence, and we compute sum(quantity)
>>> group by order.oreder_id after they are joined
>>>
>>> time orderorder__detail
>>> result
>>> id  order_idstatusid  order_idquantity
>>> 1   1   12345   0
>>> 2 1   12345   10
>>>(T 10)
>>> 3 2   12345   11
>>>(F 10)(T 21)
>>> 4 3   12345   12
>>>(F 21)(T 33)
>>> 5   1   12345   1
>>> (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T
>>> 33)
>>>
>>> Code:
>>> tableEnv.registerTableSource("a", new Order());
>>> tableEnv.registerTableSource("b", new OrderDetail());
>>> Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
>>> order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
>>> tableEnv.registerTable("ax", tbl1);
>>> Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
>>> order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
>>> tableEnv.registerTable("bx", tbl2);
>>> Table table = tableEnv.sqlQuery("SELECT ax.order_id,
>>> SUM(bx.quantity) FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY
>>> ax.order_id");
>>> DataStream> stream =
>>> tableEnv.toRetractStream(table, Row.class);
>>> stream.print();
>>>
>>> Result:
>>> (true,12345,10)
>>> (false,12345,10)
>>> (true,12345,21)
>>> (false,12345,21)
>>> (true,12345,33)
>>> (false,12345,33)
>>> (true,12345,21)
>>> (false,12345,21)
>>> (true,12345,10)
>>> (false,12345,10)
>>> (true,12345,12)
>>> (false,12345,12)
>>> (true,12345,23)
>>> (false,12345,23)
>>> (true,12345,33)
>>>
>>>
>>> I cann't understand why flink emit so many records at time 5?
>>>
>>> In production, we consume binlog stream from kafka, convert stream
>>> to flink table, after sql computation, convert result table to flink stream
>>> where we only
>>> preserve TRUE message in retract stream, and emit them to downstream
>>> kafka.
>>>
>>> Do we have some met