Re: [External] naming table stages

2021-07-28 Thread JING ZHANG
Hi Yuval,

Thanks for pointing that out. Yes, metrics names would also be affected.
The need sounds reasonable, we could create a JIRA and give a detailed
description of the requirements.

cc @godfrey @Kurt to provide more user perspective, they may be interested
in the feature.

Best,
JING ZHANG

Yuval Itzchakov  于2021年7月29日周四 上午12:49写道:

> Hi Jing,
>
> An additional challenge with the current Table API / SQL approach for
> iperator naming is that it makes it very hard to export metrics, i.e. to
> track watermarks with Prometheus, when operator names are not assignable by
> the user.
>
> On Wed, Jul 28, 2021, 13:11 JING ZHANG  wrote:
>
>> Hi Clemens,
>>
>> This feature is temporarily not supported.
>> Your needs sounds reasonable, you could create a JIRA ticket.
>>
>> Now operator name contains a lot of detailed information, it has
>> advantages and disadvantages.
>> When we need troubleshoot problems, detailed information could help us to
>> know what operator is doing.
>> But too long message would cause problem for log system and visual
>> display in frontend.
>> Maybe it's better to shorten the operator name if it is too long. And
>> there is a way to query the complete name of a given operator which help us
>> to troubleshoot problems.
>>
>> Best,
>> JING ZHANG
>>
>> Clemens Valiente  于2021年7月28日周三 下午12:16写道:
>>
>>> Is it possible to rename execution stages from the Table API? Right now
>>> the entire select transformation appears in plaintext in the task name so
>>> the log entries from ExecutionGraph are over 10,000 characters long and the
>>> log files are incredibly difficult to read.
>>> for example a simple selected field shows up as
>>> Calc(select=[(((_UTF-16LE'code = ' POSITION (FLAG(BOTH) TRIM _UTF-16LE'
>>> ' TRIM FLAG(BOTH) TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) =
>>> _UTF-16LE'') OR ((FLAG(BOTH) TRIM _UTF-16LE' ' TRIM
>>> extraInfo.loginRequestID) = _UTF-16LE'None')) CASE null:VARCHAR(2147483647)
>>> CHARACTER SET "UTF-16LE" CASE extraInfo.loginRequestID))) = 1) CASE
>>> null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE FLAG(BOTH) TRIM
>>> _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'') OR ((FLAG(BOTH)
>>> TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'None')) CASE
>>> null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE
>>> extraInfo.loginRequestID)) AS loginRequestId
>>> and we have about a dozen of those, and they're all printed out for
>>> every single log line.
>>> Is there any way of shortening this without having to suppress these log
>>> lines completely?
>>>
>>> Best Regards
>>> Clemens
>>>
>>>
>>> By communicating with Grab Inc and/or its subsidiaries, associate
>>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>>> have consented to the processing of your personal data as set out in the
>>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>>
>>> This email contains confidential information and is only for the
>>> intended recipient(s). If you are not the intended recipient(s), please do
>>> not disseminate, distribute or copy this email Please notify Grab Group
>>> immediately if you have received this by mistake and delete this email from
>>> your system. Email transmission cannot be guaranteed to be secure or
>>> error-free as any information therein could be intercepted, corrupted,
>>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>>> not accept liability for any errors or omissions in the contents of this
>>> email arises as a result of email transmission. All intellectual property
>>> rights in this email and attachments therein shall remain vested in Grab
>>> Group, unless otherwise provided by law.
>>>
>>


Re: Table Aggregate - Flink SQL

2021-07-28 Thread JING ZHANG
Hi Pranav,

Table Aggregate and Window Table Aggregate are now only supported in
TableAPI, they are not supported by SQL yet.
I think the most challenge is how to support those two features (Table
Aggregate and Window Table Aggregate) based on standard SQL gramma  since
aggregate means return a single value based on a set of other values with
same group key in ANSI SQL.

Best,
JING ZHANG

Pranav Patil  于2021年7月29日周四 上午1:27写道:

> I want to create a Python UDF for a table aggregate function. The
> documentation explains this, and how to use its results by calling the
> flatAggregate function. However, I would not like to use the Table API. I
> would like to call the table aggregate function from Flink SQL. I'm using
> Flink 1.13. My function returns the Row type.
>
> I'm not able to use LATERAL TABLE to expand as with a normal table
> function:
> SELECT T.value FROM table, LATERAL TABLE (tableAggr(table.data)) as T
>
> I'm also not able to say, for example
> SELECT T.value FROM table, tableAggr(table.data) as T
>
> Do I need to write a DataTypeHint? Or how do I expand the table into its
> rows and access them?
>


Re: TaskManager crash after cancelling a job

2021-07-28 Thread Yangze Guo
In your case, the entry point is the `cleanUpInvoke` function called
by `StreamTask#invoke`.

@ro...@apache.org Could you take another look at this?

Best,
Yangze Guo

On Thu, Jul 29, 2021 at 2:29 AM Ivan Yang  wrote:
>
> Hi Yangze,
>
> I deployed 1.13.1, same problem exists. It seems like that the cancel logic 
> has changed since 1.11.0 (which was the one we have been running for almost 1 
> year). In 1.11.0, during the cancellation, we saw some subtask stays in the 
> cancelling state for sometime, but eventually the job will be cancelled, and 
> no task manager were lost. So we can start the job right away. In the new 
> version 1.13.x, it will kill the task managers where those stuck sub tasks 
> were running on, then takes another 4-5 minutes for the task manager to 
> rejoin.  Can you point me the code that manages the job cancellation routine? 
> Want to understand the logic there.
>
> Thanks,
> Ivan
>
> > On Jul 26, 2021, at 7:22 PM, Yangze Guo  wrote:
> >
> > Hi, Ivan
> >
> > My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
> > take another look? If that is the case, you can upgrade to 1.13.1.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang  wrote:
> >>
> >> Dear Flink experts,
> >>
> >> We recently ran into an issue during a job cancellation after upgraded to 
> >> 1.13. After we issue a cancel (from Flink console or flink cancel 
> >> {jobid}), a few subtasks stuck in cancelling state. Once it gets to that 
> >> situation, the behavior is consistent. Those “cancelling tasks will never 
> >> become canceled. After 3 minutes, The job stopped, as a result, number of 
> >> task manages were lost. It will take about another 5 minute for the those 
> >> lost task manager to rejoin the Job manager. Then we can restart the job 
> >> from the previous checkpoint. Found an exception from the hanging 
> >> (cancelling) Task Manager.
> >> ==
> >>sun.misc.Unsafe.park(Native Method) 
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
> >> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
> >> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >>  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
> >>  
> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
> >>  
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
> >>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> >> java.lang.Thread.run(Thread.java:748)
> >> ===
> >>
> >> Here are some background information about our job and setup.
> >> 1) The job is relatively large, we have 500+ parallelism and 2000+ 
> >> subtasks. It’s mainly reading from a Kinesis stream and perform some 
> >> transformation and fanout to multiple output s3 buckets. It’s a stateless 
> >> ETL job.
> >> 2) The same code and setup running on smaller environments don’t seem to 
> >> have this cancel failure problem.
> >> 3) We have been using Flink 1.11.0 for the same job, and never seen this 
> >> cancel failure and killing Task Manager problem.
> >> 4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
> >> Pervious we don’t not use HA.
> >>
> >> The cancel and restart from previous checkpoint is our regular procedure 
> >> to support daily operation. With this 10 minutes TM restart cycle, it 
> >> basically slowed down our throughput. I try to understand what leads into 
> >> this situation. Hoping maybe some configuration change will smooth things 
> >> out. Also any suggestion to shorten the waiting. It appears to be some 
> >> timeout on the TaskManager and JobManager can be  adjusted to speed it up. 
> >> But really want to avoid stuck in cancellation if we can.
> >>
> >> Thanks you, hoping to get some insight knowledge here.
> >>
> >> Ivan
>


Re: TaskManager crash after cancelling a job

2021-07-28 Thread Ivan Yang
Hi Yangze,

I deployed 1.13.1, same problem exists. It seems like that the cancel logic has 
changed since 1.11.0 (which was the one we have been running for almost 1 
year). In 1.11.0, during the cancellation, we saw some subtask stays in the 
cancelling state for sometime, but eventually the job will be cancelled, and no 
task manager were lost. So we can start the job right away. In the new version 
1.13.x, it will kill the task managers where those stuck sub tasks were running 
on, then takes another 4-5 minutes for the task manager to rejoin.  Can you 
point me the code that manages the job cancellation routine? Want to understand 
the logic there.

Thanks,
Ivan

> On Jul 26, 2021, at 7:22 PM, Yangze Guo  wrote:
> 
> Hi, Ivan
> 
> My gut feeling is that it is related to FLINK-22535. Could @Yun Gao
> take another look? If that is the case, you can upgrade to 1.13.1.
> 
> Best,
> Yangze Guo
> 
> On Tue, Jul 27, 2021 at 9:41 AM Ivan Yang  wrote:
>> 
>> Dear Flink experts,
>> 
>> We recently ran into an issue during a job cancellation after upgraded to 
>> 1.13. After we issue a cancel (from Flink console or flink cancel {jobid}), 
>> a few subtasks stuck in cancelling state. Once it gets to that situation, 
>> the behavior is consistent. Those “cancelling tasks will never become 
>> canceled. After 3 minutes, The job stopped, as a result, number of task 
>> manages were lost. It will take about another 5 minute for the those lost 
>> task manager to rejoin the Job manager. Then we can restart the job from the 
>> previous checkpoint. Found an exception from the hanging (cancelling) Task 
>> Manager.
>> ==
>>sun.misc.Unsafe.park(Native Method) 
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
>> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>>  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) 
>> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>>  java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947) 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>>  
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>>  
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>>  org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
>> java.lang.Thread.run(Thread.java:748)
>> ===
>> 
>> Here are some background information about our job and setup.
>> 1) The job is relatively large, we have 500+ parallelism and 2000+ subtasks. 
>> It’s mainly reading from a Kinesis stream and perform some transformation 
>> and fanout to multiple output s3 buckets. It’s a stateless ETL job.
>> 2) The same code and setup running on smaller environments don’t seem to 
>> have this cancel failure problem.
>> 3) We have been using Flink 1.11.0 for the same job, and never seen this 
>> cancel failure and killing Task Manager problem.
>> 4) With upgrading to 1.13, we also added Kubernetes HA (zookeeperless). 
>> Pervious we don’t not use HA.
>> 
>> The cancel and restart from previous checkpoint is our regular procedure to 
>> support daily operation. With this 10 minutes TM restart cycle, it basically 
>> slowed down our throughput. I try to understand what leads into this 
>> situation. Hoping maybe some configuration change will smooth things out. 
>> Also any suggestion to shorten the waiting. It appears to be some timeout on 
>> the TaskManager and JobManager can be  adjusted to speed it up. But really 
>> want to avoid stuck in cancellation if we can.
>> 
>> Thanks you, hoping to get some insight knowledge here.
>> 
>> Ivan



Table Aggregate - Flink SQL

2021-07-28 Thread Pranav Patil
I want to create a Python UDF for a table aggregate function. The
documentation explains this, and how to use its results by calling the
flatAggregate function. However, I would not like to use the Table API. I
would like to call the table aggregate function from Flink SQL. I'm using
Flink 1.13. My function returns the Row type.

I'm not able to use LATERAL TABLE to expand as with a normal table function:
SELECT T.value FROM table, LATERAL TABLE (tableAggr(table.data)) as T

I'm also not able to say, for example
SELECT T.value FROM table, tableAggr(table.data) as T

Do I need to write a DataTypeHint? Or how do I expand the table into its
rows and access them?


Re: [External] naming table stages

2021-07-28 Thread Yuval Itzchakov
Hi Jing,

An additional challenge with the current Table API / SQL approach for
iperator naming is that it makes it very hard to export metrics, i.e. to
track watermarks with Prometheus, when operator names are not assignable by
the user.

On Wed, Jul 28, 2021, 13:11 JING ZHANG  wrote:

> Hi Clemens,
>
> This feature is temporarily not supported.
> Your needs sounds reasonable, you could create a JIRA ticket.
>
> Now operator name contains a lot of detailed information, it has
> advantages and disadvantages.
> When we need troubleshoot problems, detailed information could help us to
> know what operator is doing.
> But too long message would cause problem for log system and visual display
> in frontend.
> Maybe it's better to shorten the operator name if it is too long. And
> there is a way to query the complete name of a given operator which help us
> to troubleshoot problems.
>
> Best,
> JING ZHANG
>
> Clemens Valiente  于2021年7月28日周三 下午12:16写道:
>
>> Is it possible to rename execution stages from the Table API? Right now
>> the entire select transformation appears in plaintext in the task name so
>> the log entries from ExecutionGraph are over 10,000 characters long and the
>> log files are incredibly difficult to read.
>> for example a simple selected field shows up as
>> Calc(select=[(((_UTF-16LE'code = ' POSITION (FLAG(BOTH) TRIM _UTF-16LE' '
>> TRIM FLAG(BOTH) TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) =
>> _UTF-16LE'') OR ((FLAG(BOTH) TRIM _UTF-16LE' ' TRIM
>> extraInfo.loginRequestID) = _UTF-16LE'None')) CASE null:VARCHAR(2147483647)
>> CHARACTER SET "UTF-16LE" CASE extraInfo.loginRequestID))) = 1) CASE
>> null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE FLAG(BOTH) TRIM
>> _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'') OR ((FLAG(BOTH)
>> TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'None')) CASE
>> null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE
>> extraInfo.loginRequestID)) AS loginRequestId
>> and we have about a dozen of those, and they're all printed out for every
>> single log line.
>> Is there any way of shortening this without having to suppress these log
>> lines completely?
>>
>> Best Regards
>> Clemens
>>
>>
>> By communicating with Grab Inc and/or its subsidiaries, associate
>> companies and jointly controlled entities (“Grab Group”), you are deemed to
>> have consented to the processing of your personal data as set out in the
>> Privacy Notice which can be viewed at https://grab.com/privacy/
>>
>> This email contains confidential information and is only for the intended
>> recipient(s). If you are not the intended recipient(s), please do not
>> disseminate, distribute or copy this email Please notify Grab Group
>> immediately if you have received this by mistake and delete this email from
>> your system. Email transmission cannot be guaranteed to be secure or
>> error-free as any information therein could be intercepted, corrupted,
>> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
>> not accept liability for any errors or omissions in the contents of this
>> email arises as a result of email transmission. All intellectual property
>> rights in this email and attachments therein shall remain vested in Grab
>> Group, unless otherwise provided by law.
>>
>


Checkpoints question

2021-07-28 Thread Kirill Kosenko
Hello


I'm new to Flink. I am playing with Stateful Functions and have a question 
about checkpoints and how they work.


Some configuration details:

state.backend: rocksdb

state.backend.incremental: true

execution.checkpointing.mode: AT_LEAST_ONCE


As far as I know:

1. There is a sync checkpoint phase. I suppose flush of the memtable to the sst 
files during this phase happens and a snapshot is taken after. It appears to be 
a blocking operation.

2. If I got the idea right - snapshot should be sent asynchronously to a 
durable storage right after.

Please confirm my understanding.


I noticed when the checkpoint is triggered there is a delay in messages 
processing. In case without a checkpoint, message processing usually takes less 
than 100ms in case checkpoint triggers.


Message processing usually takes less than 100ms if the checkpoint hasn’t been 
triggered yet. Unfortunately, after the checkpoint is triggered the same 
message processing takes over 2 seconds. This doesn’t match our expectations, 
as we need messages to be processed significantly faster(in real-time), ideally 
less than 1 second during the checkpoint.


>From what I noticed, the larger state we have the longer checkpoint time is 
>required to make a snapshot while using an incremental approach: if the size 
>of the state is about 100MB, then the checkpoint time takes less than 1 
>second. That works for me. However, for 15GB state the checkpoint time takes 
>3-5 seconds. I just want to be sure that state size augments the checkpoint 
>time, despite the fact I use incremental checkpoints.


Please confirm or disprove my understanding.

Is there a way to speed up the checkpoint time or alternatively make the 
checkpoints completely asynchronous?


Thanks



Re: Issue with Flink SQL using RocksDB backend

2021-07-28 Thread Timo Walther

Hi Yuval,

having a locally reproducible result would be great. Also more 
information about the used data types. Because this could be a 
serializer issue that messes up the binary format.


Regards,
Timo


On 27.07.21 07:37, Yuval Itzchakov wrote:

Hi Jing,

Yes, FIRST is a UDAF.

I've been trying to reproduce this locally without success so far.

The query itself has more fields and aggregates. Once I can reproduce 
this locally I'll try to narrow down the problematic field and share 
more information.


On Tue, Jul 27, 2021, 05:17 JING ZHANG > wrote:


Hi Yuval,
I run a similar SQL (without `FIRST` aggregate function), there is
nothing wrong.
`FIRST` is a custom aggregate function? Would you please check if
there is a drawback in `FIRST`? Whether the query could run without
`FIRST`?

Best,
JING ZHANG

Yuval Itzchakov mailto:yuva...@gmail.com>> 于
2021年7月27日周二 上午12:29写道:

Hi,

*Setup:*

1 JM,
1 TM,
Flink 1.13.1
RocksDBStateBackend.

I have a query with the rough sketch of the following:

SELECT CAST(TUMBLE_START(event_time, INTERVAL '2' MINUTE) AS
TIMESTAMP) START_TIME
                CAST(TUMBLE_END(event_time, INTERVAL '2' MINUTE)
AS TIMESTAMP)     END_TIME
                FOO,
                BAR,
                FIRST(BAZ)
WHERE QWAK = FALSE
GROUP BY TUMBLE(event_time, INTERVAL '2' MINUTE),
                     FOO,
                     BAR
HAVING COUNT(DISTINCT BUN) >= 10

The query itself is a bit more complicated than that. When
executing it in the cluster, I see the following error:

java.lang.RuntimeException: No copy finished, this should be a
bug, The remaining length is: 73728
at

org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:236)
at

org.apache.flink.table.runtime.typeutils.StringDataSerializer.serialize(StringDataSerializer.java:76)
at

org.apache.flink.table.runtime.typeutils.StringDataSerializer.serialize(StringDataSerializer.java:34)
at

org.apache.flink.table.runtime.typeutils.ExternalSerializer.serialize(ExternalSerializer.java:150)
at

org.apache.flink.runtime.state.SerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(SerializedCompositeKeyBuilder.java:152)
at

org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:152)
at

org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:123)
at

org.apache.flink.table.runtime.dataview.StateMapView$StateMapViewWithKeysNullable.get(StateMapView.java:159)
at

org.apache.flink.table.runtime.dataview.StateMapView$NamespacedStateMapViewWithKeysNullable.get(StateMapView.java:392)
at GroupingWindowAggsHandler$217.accumulate(Unknown Source)
at

org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:366)
at

org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at

org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at

org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at

org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at

org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)

Would appreciate help in the direction of how to debug this
issue, or if anyone has encountered this before.


-- 
Best Regards,

Yuval Itzchakov.





Re: foreach exec sql

2021-07-28 Thread Timo Walther
Btw you are executing a lot of Flink jobs in parallel with this because 
the submission is async. Maybe the concept of a StatementSet via 
TableEnvironment.createStatementSet() helps.


Regards,
Timo


On 27.07.21 10:56, Caizhi Weng wrote:

Hi!

Try this:
sql.zipWithIndex.foreach { case (sql, idx) =>
   val result = tableEnv.executeSql(sql)
   if (idx == 7) {
     result.print()
   }
}

igyu mailto:i...@21cn.com>> 于2021年7月27日周二 下午4:38 
写道:


      tableEnv.executeSql(sql(0))
      tableEnv.executeSql(sql(1))
      tableEnv.executeSql(sql(2))
      tableEnv.executeSql(sql(3))
      tableEnv.executeSql(sql(4))
      tableEnv.executeSql(sql(5))
      tableEnv.executeSql(sql(6))
      tableEnv.executeSql(sql(7)).print()

that is OK

but I hope

       sql.foreach(s=>{
         tableEnv.executeSql(s)
       })


igyu





Re: [External] naming table stages

2021-07-28 Thread JING ZHANG
Hi Clemens,

This feature is temporarily not supported.
Your needs sounds reasonable, you could create a JIRA ticket.

Now operator name contains a lot of detailed information, it has advantages
and disadvantages.
When we need troubleshoot problems, detailed information could help us to
know what operator is doing.
But too long message would cause problem for log system and visual display
in frontend.
Maybe it's better to shorten the operator name if it is too long. And there
is a way to query the complete name of a given operator which help us to
troubleshoot problems.

Best,
JING ZHANG

Clemens Valiente  于2021年7月28日周三 下午12:16写道:

> Is it possible to rename execution stages from the Table API? Right now
> the entire select transformation appears in plaintext in the task name so
> the log entries from ExecutionGraph are over 10,000 characters long and the
> log files are incredibly difficult to read.
> for example a simple selected field shows up as
> Calc(select=[(((_UTF-16LE'code = ' POSITION (FLAG(BOTH) TRIM _UTF-16LE' '
> TRIM FLAG(BOTH) TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) =
> _UTF-16LE'') OR ((FLAG(BOTH) TRIM _UTF-16LE' ' TRIM
> extraInfo.loginRequestID) = _UTF-16LE'None')) CASE null:VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE" CASE extraInfo.loginRequestID))) = 1) CASE
> null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE FLAG(BOTH) TRIM
> _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'') OR ((FLAG(BOTH)
> TRIM _UTF-16LE' ' TRIM extraInfo.loginRequestID) = _UTF-16LE'None')) CASE
> null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE" CASE
> extraInfo.loginRequestID)) AS loginRequestId
> and we have about a dozen of those, and they're all printed out for every
> single log line.
> Is there any way of shortening this without having to suppress these log
> lines completely?
>
> Best Regards
> Clemens
>
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to the processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email Please notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>


Calling a stateful fuction from Flink Job - DataStream Integration

2021-07-28 Thread Deniz Koçak
Hi,

We would like to host a separate service (logically and possibly
physically) from the Flink Job which also we would like to call from
our Flink Job. We have been considering remote functions via HTTP
which seems to be good option on cloud. We have been considering Async
I/o and statefun to use the remote HTTP based endpoint. However, we
have few questions not clear for us.

1) In statefun documentation says an ingress is mandatory, so we have
to define one I think? In here -
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/application-building-blocks.html
- it says `This can be anything from a Kafka topic, to a messsage
queue, to an http request - anything that can get data into the system
and trigger the intitial functions to begin computation.` and mentions
HTTP as an ingress. However, I could not see an example on that? Can
we use the statefun without an ingress or an HTTP should be the
ingress to our statefun?

2) I believe 
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/flink-datastream.html
is the API we have to use for calling statefun from Flink Job.
However, related to question 1, how it communicates with the statefun
just via HTTP? Even in that integration do we need an ingress or what
will it be?

3) We are intrested in statefun for tault tolerancy, retries and etc.
Also considering Async I/O for Flink I wonder if there is any pattern
that we can benefit from in terms of fault tolerancy for Async I/O? If
we can handle all these via Async I/O, statefun would be a heavy
lifting for us.

Thanks,


Re: Session Windows should have a max size

2021-07-28 Thread Caizhi Weng
Hi!

You can use DynamicProcessingTimeSessionWindows with your
own SessionWindowTimeGapExtractor implementation. You can count the number
of records processed in the extractor and return a time gap of almost zero
(but not exactly zero, as it is invalid) if the number of records exceeds
the limit.

For more complicated scenarios, you can also write your own window by
extending the provided session windows.

Prashant Deva  于2021年7月28日周三 下午4:08写道:

> It seems there is no way to set a maximum size of events for a session
> window.
> This results in a security vulnerability.
> Example: I am recording all the user interaction events of a browser
> session. A malicious user can then generate hundreds of thousands or even
> millions of events, and cause out of memory errors on our backend.
>
> I think Session Windows should have a way to stop the window from getting
> to an infinite size.
>


Session Windows should have a max size

2021-07-28 Thread Prashant Deva
It seems there is no way to set a maximum size of events for a session
window.
This results in a security vulnerability.
Example: I am recording all the user interaction events of a browser
session. A malicious user can then generate hundreds of thousands or even
millions of events, and cause out of memory errors on our backend.

I think Session Windows should have a way to stop the window from getting
to an infinite size.


Re: IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2021-07-28 Thread Dian Fu
Have you mixed use of Python Table API and Python DataStream API and converted 
DataStream to Table in your program? If so, there is an issue 
https://issues.apache.org/jira/browse/FLINK-23133 
 which may be related 
(already fixed in 1.12.5 ).

PS: 1.12.5 is currently under VOTE [1] and is still not available. However, you 
could find the packages in [2].

Regards,
Dian

[1] 
https://lists.apache.org/thread.html/rda18df528066b2bfe46a18ba377aaf3135da0c7d0b1ff52ef1015b5a%40%3Cdev.flink.apache.org%3E
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.5-rc3/python/

> 2021年7月28日 下午2:42,Curt Buechter  写道:
> 
> I have a pyflink job that starts using the Datastream api and converts to the 
> Table API. In the datastream portion, there is a MapFunction. I am getting 
> the following error:
> 
> flink run -py sample.py
> 
> java.lang.IllegalArgumentException: The configured managed memory fraction 
> for Python worker process must be within (0, 1], was: %s. It may be because 
> the consumer type "Python" was missing or set to 0 for the config option 
> "taskmanager.memory.managed.consumer-weights".0.0
> at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:237)
>  
> ~[blob_p-96a992be07a80c2f2104b54a9c2509bd6cf8e8bc-845e415af5e71e7433bf0c5cd5936782:1.12.1]
> at 
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:113)
>  
> ~[blob_p-96a992be07a80c2f2104b54a9c2509bd6cf8e8bc-845e415af5e71e7433bf0c5cd5936782:1.12.1]
> at 
> org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:107)
>  
> ~[blob_p-96a992be07a80c2f2104b54a9c2509bd6cf8e8bc-845e415af5e71e7433bf0c5cd5936782:1.12.1]
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> [flink-dist_2.12-1.12.1.jar:1.12.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
> 
> I have tried setting the configuration setting mentioned in the error, as 
> well as a few others.
> 
> In flink-conf.yaml:
> taskmanager.memory.managed.consumer-weights: DATAPROC:70,PYTHON:30
> taskmanager.memory.managed.fraction: 0.4
> 
> In python table api:
> t_env.get_config().get_configuration().set_string("taskmanager.memory.process.size",
>  '500m')
> t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.size",
>  '500m')
> t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.fraction",
>  '.4')
> t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
>  'DATAPROC:70,PYTHON:30')
> 
> Flink version 1.12.1
> 
> Any help is appreciated. Thanks
>