Re: how to propagate watermarks across multiple jobs

2021-03-03 Thread Piotr Nowojski
Great :)

Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] that
probably will prevent you from using it directly. I hope it will be fixed
in some next release. In the meantime you can just inspire your solution
with the source code.

Best,
Piotrek


[1] https://issues.apache.org/jira/browse/FLINK-21317

czw., 4 mar 2021 o 03:48 yidan zhao  napisał(a):

> Yes, you are right and thank you. I take a brief look at what
> FlinkKafkaShuffle is doing, it seems what I need and I will have a try.
>
>>


Re: Scaling Higher than 10k Nodes

2021-03-03 Thread Piotr Nowojski
Hi Joey,

Sorry for not responding to your question sooner. As you can imagine there
are not many users running Flink at such scale. As far as I know, Alibaba
is running the largest/one of the largest clusters, I'm asking for someone
who is familiar with those deployments to take a look at this conversation.
I hope someone will respond here soon :)

Best,
Piotrek

pon., 1 mar 2021 o 14:43 Joey Tran  napisał(a):

> Hi, I was looking at Apache Beam/Flink for some of our data processing
> needs, but when reading about the resource managers
> (YARN/mesos/Kubernetes), it seems like they all top out at around 10k
> nodes. What are recommended solutions for scaling higher than this?
>
> Thanks in advance,
> Joey
>


Re: [Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-03 Thread Sebastián Magrí
Thanks a lot for the added context and pointers Julian and Leonard,

I've fixed it by going down to the arithmetics as suggested in one of the
Calcite discussions.

The changes proposed by FLIP-126 definitely look good. I'll check its
details further.

Best Regards,

On Thu, 4 Mar 2021 at 04:18, Leonard Xu  wrote:

> Hi, Sebastián Ramírez Magrí
> (Sorry for wrong name in above mail)
>
> Flink follows old version calcite’s behaviour which lead to the wrong
> behavior. snd Julian is right that calcite has corrected FLOOR and CEIL
> functions in CALCITE-3412, Flink has upgraded  calcite to 1.26 version
> which contains the patch, what we need is only to adapt it in Flink code,
> I’d like to make this as a part of FLIP-162 and fix it soon.
>
> Thanks Julian and Timo for the reminder.
>
>
> Best,
> Leonard
>
> 在 2021年3月4日,12:14,Leonard Xu  写道:
>
> Hi, Jaffe
>
> Flink follows old version calcite’s behaviour which lead to the wrong
> behavior. snd Julian is right that calcite has corrected FLOOR and CEIL
> functions in CALCITE-3412, Flink has upgraded  calcite to 1.26 version
> which contains the patch, what we need is only to adapt it in Flink code,
> I’d like to make this as a part of FLIP-162 and fix it soon.
>
> Thanks Julian and Timo for the reminder.
>
>
> Best,
> Leonard
>
>
>

-- 
Sebastián Ramírez Magrí


Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问

2021-03-03 Thread Qishang
Hi 社区。
Flink 1.12.1

现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。

insert into table_a select id,udf(a),b,c from table_b;

发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?

```
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, temp_table]], fields=[id...])

Stage 3 : Operator
content : ChangelogNormalize(key=[id])
ship_strategy : HASH

Stage 4 : Operator
content : Calc(select=[...])
ship_strategy : FORWARD

Stage 5 : Data Sink
content : Sink: Sink(table=[default_catalog.default_database.table_a],
fields=[id...])
ship_strategy : FORWARD
```


Re: flink sql中如何使用异步io关联维表?

2021-03-03 Thread HunterXHunter
定义一个 sourcetable



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-savepoint问题

2021-03-03 Thread Congxian Qiu
对于 keyed state,需要保证同一个 key 在 同一个 keygroup 中,如果是某个 key 有热点,可以在 keyby 之前进行一次
map(在 key 后面拼接一些 后缀),然后 keyby,最后处理完成之后,将这些进行聚合
Best,
Congxian


guomuhua <663021...@qq.com> 于2021年3月4日周四 下午12:49写道:

> 我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
> nobleyd wrote
> > 是不是使用了随机key。
>
> > guaishushu1103@
>
> >  
>
> > guaishushu1103@
>
> >  于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> > java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> > KeyedProcess (21/48).> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
>
> > at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
>
> > at>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
>
> > at java.lang.Thread.run(Thread.java:745)> Caused by:
> > java.util.concurrent.ExecutionException:>
> > java.lang.IllegalArgumentException: Key group 0 is not in>
> > KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
>
> > at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.
>
> > (OperatorSnapshotFinalizer.java:47)> at>
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
>
> > ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> > not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
>
> > at>
> >
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
>
> > at>
> >
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
>
> > at>
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
>
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
>
> > ... 5 more>>>
>
> > guaishushu1103@
>
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-savepoint问题

2021-03-03 Thread guomuhua
我也遇到类似情况,为了打散数据,keyby加了随机数。请问怎么正确打散数据呢?
nobleyd wrote
> 是不是使用了随机key。

> guaishushu1103@

>  

> guaishushu1103@

>  于2021年3月3日周三 下午6:53写道:> checkpoint 可以成功保存,但是savepoint出现错误:>
> java.lang.Exception: Could not materialize checkpoint 2404 for operator>
> KeyedProcess (21/48).> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)>
> 
> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)>
> 
> at>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)>
> 
> at java.lang.Thread.run(Thread.java:745)> Caused by:
> java.util.concurrent.ExecutionException:>
> java.lang.IllegalArgumentException: Key group 0 is not in>
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)> at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)>
> 
> at> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.

> (OperatorSnapshotFinalizer.java:47)> at>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)>
> 
> ... 3 more> Caused by: java.lang.IllegalArgumentException: Key group 0 is
> not in> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)>
> 
> at>
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)>
> 
> at>
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)>
> 
> at>
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)>
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)> at>
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)>
> 
> ... 5 more>>> 

> guaishushu1103@

>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-savepoint问题

2021-03-03 Thread guomuhua
我也遇到同样问题,为了打散数据,在keyby时加了随机数作为后缀,去掉随机数,可以正常savepoint,加上随机数就savepoint失败。所以如果确有要打散数据的需求,应该怎么处理呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Processing-time temporal join is not supported yet.

2021-03-03 Thread Leonard Xu
Hi, Eric

> what will be the best workaround to enrich stream of data from a kafka topics 
> with statical data based on id?
Currently you can put your statical data in Hive/JDBC/HBase which supports 
lookup the data in full table env as a workaround,.
You can also write a UDF which caches the s3 files that can be used to enrich 
your stream data.

Best,
Leonard

> 
> 
> Le sam. 27 févr. 2021 à 05:15, Leonard Xu  > a écrit :
> Hi, Eric
> 
> Firstly FileSystemTableSource doe not implement LookupTableSource which means 
> we cannot directly lookup a Filesystem table.
> 
> In FLINK-19830, we plan to support Processing-time temporal join any 
> table/views by lookup the data in join operator state which scanned from the 
> filesystem table, but as the issue described: join processing for left stream 
> doesn't wait for the complete snapshot of temporal table, this may mislead 
> users in production environment.  
> Eg: your s3 table has 1000 records, but the join operator does not know when 
> all records has been arrived, the correlation maybe incorrect, thus we 
> disable this feature.
> 
> I think we can  implement LookupTableSource for  FileSystemTableSource 
> currently, after that, we can directly lookup a Filesystem table, the 
> implementation will be similar to Hive table where we cache all data of the 
> files and then lookup the cache.  Could you help create an JIRA ticket for 
> this?
> 
> 
> Best,
> Leonard 
> 
> 
>> 在 2021年2月26日,23:41,Matthias Pohl > > 写道:
>> 
>> Hi Eric,
>> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the 
>> thread. Maybe, he has a workaround for your case.
>> 
>> Best,
>> Matthias
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-19830 
>> 
>> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann > > wrote:
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal 
>> join is supported for kv like join but when i try i get a:
>> 
>> Exception in thread "main" org.apache.flink.table.api.TableException: 
>> Processing-time temporal join is not supported yet.
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>> 
>> my query:
>> 
>> SELECT e.id , r.test FROM kafkaTable as e JOIN s3Table FOR 
>> SYSTEM_TIME AS OF e.proctime AS r ON e.id  = r.id 
>> 
>> 
>> my s3 table:
>> 
>> CREATE TABLE s3Table(id STRING, test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>   WITH ('connector'='filesystem','path'='s3a://fs/','format'='json <>')

Re: [Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-03 Thread Leonard Xu
Hi, Sebastián Ramírez Magrí 
(Sorry for wrong name in above mail)

Flink follows old version calcite’s behaviour which lead to the wrong behavior. 
snd Julian is right that calcite has corrected FLOOR and CEIL functions in 
CALCITE-3412, Flink has upgraded  calcite to 1.26 version which contains the 
patch, what we need is only to adapt it in Flink code, I’d like to make this as 
a part of FLIP-162 and fix it soon.

Thanks Julian and Timo for the reminder.


Best,
Leonard

> 在 2021年3月4日,12:14,Leonard Xu  写道:
> 
> Hi, Jaffe
> 
> Flink follows old version calcite’s behaviour which lead to the wrong 
> behavior. snd Julian is right that calcite has corrected FLOOR and CEIL 
> functions in CALCITE-3412, Flink has upgraded  calcite to 1.26 version which 
> contains the patch, what we need is only to adapt it in Flink code, I’d like 
> to make this as a part of FLIP-162 and fix it soon.
> 
> Thanks Julian and Timo for the reminder.
> 
> 
> Best,
> Leonard



Re: [Flink-SQL] FLOORing OR CEILing a DATE or TIMESTAMP to WEEK uses Thursdays as week start

2021-03-03 Thread Leonard Xu
Hi, Jaffe

Flink follows old version calcite’s behaviour which lead to the wrong behavior. 
snd Julian is right that calcite has corrected FLOOR and CEIL functions in 
CALCITE-3412, Flink has upgraded  calcite to 1.26 version which contains the 
patch, what we need is only to adapt it in Flink code, I’d like to make this as 
a part of FLIP-162 and fix it soon.

Thanks Julian and Timo for the reminder.


Best,
Leonard





> 在 2021年3月3日,03:35,Jaffe, Julian  写道:
> 
> Calcite does not follow ISO-8601. Instead, until very recently Calcite weeks 
> started on Thursdays[1].
> 
> (As an aside, Calcite somewhat abuses the WEEK time unit - converting a date 
> to a week returns an integer representing the week of the year the date falls 
> in while FLOORing or CEILing a timestamp to week returns a timestamp. This 
> can cause integration issues with other systems if you're unaware)
> 
> Julian
> 
> 
> [1] https://issues.apache.org/jira/browse/CALCITE-3412
> 
> On 3/2/21, 4:12 AM, "Timo Walther"  wrote:
> 
>Hi Sebastián,
> 
>it might be the case that some time functions are not correct due to the 
>underlying refactoring of data structures. I will loop in Leonard in CC 
>that currently works on improving this situation as part of FLIP-162 [1].
> 
>@Leonard: Is this wrong behavior on your list?
> 
>Regards,
>Timo
> 
> 
>[1] 
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D162-253A-2BConsistent-2BFlink-2BSQL-2Btime-2Bfunction-2Bbehavior=DwIDaQ=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac=ZeBr2XK222xGShmn_0N2tF_qGbi7kvWg8WQrsLuquMk=3-zBtwyRTd7WiU63ZVwpTKW4vDnn-fjckRI9yjFjrNs=
>  
> 
> 
> 
> 
>On 02.03.21 12:26, Sebastián Magrí wrote:
>> While using a simple query such as this
>> 
>> SELECT
>>`ts`,
>>FLOOR(`ts` TO WEEK) as `week_start`,
>>CEIL(`ts` TO WEEK) as `week_end`
>> FROM some_table
>> 
>> I get some weird results like these:
>> 
>> 2021-03-01T00:00|2021-02-25T00:00|2021-03-04T00:00
>> 
>> Which is obviously wrong since March 1st is on Monday, February 25th is 
>> Thursday as well as March 04th.
>> 
>> I've tried different combinations of timezone configurations and with 
>> both timestamps and dates, with the same results.
>> 
>> Is there anything obviously wrong in that query? Is there any 
>> configuration to keep in mind for the start of week day?
>> 
>> -- 
>> Sebastián Ramírez Magrí
> 
> 



Defining GlobalJobParameters in Flink Unit Testing Harnesses

2021-03-03 Thread Rion Williams
Hi all,

Early today I had asked a few questions regarding the use of the many
testing constructs available within Flink and believe that I have things in
a good direction at present. I did run into a specific case that either may
not be supported, or just isn't documented well enough for me to determine
what is going wrong.

Basically, I have a KeyedProcessFunction that reads some global-level
configuration via GlobalJobParameters within its open function:

override fun open(configuration: Configuration) {
// Omitted for brevity

val parameters = runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
if (parameters != null) {
processingTimeBuffer = parameters.getLong("processingTimeBuffer",
0L)
}
}

This works just as expected within the actual pipeline itself when set
similarly:

streamEnvironment.config.globalJobParameters = parameters

However, I don't see an effective way to set this against a TestHarness as
I've made several attempts but I never can seem to populate the
globalJobParameters property within the KeyedProcessFunction itself using a
test harness despite multiple attempts

// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)

// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)

Is this supported or am I simply going about it the wrong way? Or even just
perhaps missing a piece of the puzzle?

Thanks much,

Rion


pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-03 Thread Aeden Jameson
I'm hoping to have my confusion clarified regarding the settings,

1. pipeline.auto-watermark-interval
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-

2. setAutoWatermarkInterval
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-

I noticed the default value of pipeline.auto-watermark-interval is 0
and according to these docs,
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark,
it states, "If watermark interval is 0ms, the generated watermarks
will be emitted per-record if it is not null and greater than the last
emitted one." However in the documentation for
setAutoWatermarkInterval the value 0 disables watermark emission.

* Are they intended to be the same setting? If not how are they
different? Is one for FlinkSql and the other DataStream API?

-- 
Thank you,
Aeden


Re: how to propagate watermarks across multiple jobs

2021-03-03 Thread yidan zhao
Yes, you are right and thank you. I take a brief look at what
FlinkKafkaShuffle is doing, it seems what I need and I will have a try.

>


Re: flink Application Native k8s使用oss作为backend日志偶尔报错

2021-03-03 Thread 王 羽凡
2021-03-04 02:33:25,292 DEBUG org.apache.flink.runtime.rpc.akka.SupervisorActor 
   [] - Starting FencedAkkaRpcActor with name jobmanager_2.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,304 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC 
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .
2021/3/4 上午10:33:25 2021-03-04 02:33:25,310 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing 
job TransactionAndAccount ().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,323 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart 
back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,380 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Running 
initialization on master for job TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,380 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully 
ran initialization on master in 0 ms.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,381 DEBUG 
org.apache.flink.runtime.jobmaster.JobMaster [] - Adding 2 
vertices from job graph TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,381 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Attaching 2 
topologically sorted vertices to existing job graph with 0 vertices and 0 
intermediate results.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Connecting 
ExecutionJobVertex cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom Source -> 
format to json -> Filter -> process timestamp range -> Timestamps/Watermarks) 
to 0 predecessors.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Connecting 
ExecutionJobVertex 337adade1e207453ed3502e01d75fd03 
(Window(TumblingEventTimeWindows(8640), EventTimeTrigger, SumAggregator, 
PassThroughWindowFunction) -> Flat Map -> Sink: tidb) to 1 predecessors.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,389 DEBUG 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Connecting 
input 0 of vertex 337adade1e207453ed3502e01d75fd03 
(Window(TumblingEventTimeWindows(8640), EventTimeTrigger, SumAggregator, 
PassThroughWindowFunction) -> Flat Map -> Sink: tidb) to intermediate result 
referenced via predecessor cbc357ccb763df2852fee8c4fc7d55f2 (Source: Custom 
Source -> format to json -> Filter -> process timestamp range -> 
Timestamps/Watermarks).
2021/3/4 上午10:33:25 2021-03-04 02:33:25,395 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 pipelined regions in 2 ms
2021/3/4 上午10:33:25 2021-03-04 02:33:25,396 DEBUG 
org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully 
created execution graph from job graph TransactionAndAccount 
().
2021/3/4 上午10:33:25 2021-03-04 02:33:25,406 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Using 
job/cluster config to configure application-defined state backend: File State 
Backend (checkpoints: 'oss://xx/backend', savepoints: 'null', asynchronous: 
TRUE, fileStateThreshold: 20480)
2021/3/4 上午10:33:25 2021-03-04 02:33:25,406 INFO  
org.apache.flink.runtime.jobmaster.JobMaster [] - Using 
application-defined state backend: File State Backend (checkpoints: 
'oss://xx/backend', savepoints: 'null', asynchronous: TRUE, fileStateThreshold: 
20480)
2021/3/4 上午10:33:25 2021-03-04 02:33:25,419 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
2021/3/4 上午10:33:25 [ErrorCode]: NoSuchKey
2021/3/4 上午10:33:25 [RequestId]: 604046F58B49C830320A1A53
2021/3/4 上午10:33:25 [HostId]: null
2021/3/4 上午10:33:25 2021-03-04 02:33:25,432 INFO  
org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss  [] - 
[Server]Unable to execute HTTP request: Not Found
2021/3/4 上午10:33:25 [ErrorCode]: NoSuchKey
2021/3/4 上午10:33:25 [RequestId]: 604046F58B49C830322A1A53
2021/3/4 上午10:33:25 [HostId]: null
2021/3/4 上午10:33:25 2021-03-04 02:33:25,442 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='demo--jobmanager-leader'}.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,448 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 
1 checkpoints in 
KubernetesStateHandleStore{configMapName='demo--jobmanager-leader'}.
2021/3/4 上午10:33:25 2021-03-04 02:33:25,449 INFO  

Re: [External] : Re: Need help with JDBC Broken Pipeline Issue after some idle time

2021-03-03 Thread Fuyao Li
Hi Qinghui,

I agree. I am trying to found internal and resources on the internet to fix the 
issue.  Idle Time 
Limits
 might be a reason. But after configuring those parameters and updated the 
sqlnet.ora to
WALLET_LOCATION = (SOURCE = (METHOD = file) (METHOD_DATA = (DIRECTORY="… ")))
SSL_SERVER_DN_MATCH=yes
NAMES.DIRECTORY_PATH=(ezconnect,tnsnames)
SQLNET.USE_HTTPS_PROXY=on
DISABLE_OOB=on
SQLNET.RECV_TIMEOUT = 7200
BEQUEATH_DETACH = YES
SQLNET.EXPIRE_TIME = 1
SQLNET.SEND_TIMEOUT = 7200
SQLNET.INBOUND_CONNECT_TIMEOUT = 7200

SQLNET.EXPIRE_TIME is kind of like heartbeat thing to keep the connection alive.

It still doesn’t work after all of these configurations. Pretty weird…

I will post a follow up if I could find the answer… Thanks.

BR,
Fuyao


From: XU Qinghui 
Date: Tuesday, March 2, 2021 at 13:40
To: Fuyao Li 
Cc: user , Timo Walther 
Subject: [External] : Re: Need help with JDBC Broken Pipeline Issue after some 
idle time
It sounds like the jdbc driver's connection is closed somehow, and probably has 
nothing to do with flink itself.
Maybe you could check if there's some settings on the db that could close the 
connection after some inactivity, or otherwise it could be your network drops 
the inactive tcp connection after some time (you can try to use tcp keepalive 
in this case).

BR,


Le mar. 2 mars 2021 à 19:38, Fuyao Li 
mailto:fuyao...@oracle.com>> a écrit :
Sorry for the uncompleted email.

Error log of broken pipeline, the failed SQL will be executed after checkpoint 
automatic recovery. Please share some ideas on this issue. Really appreciate 
it. Thanks!

09:20:02,868 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,869 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
at 

Re: 1.12.2 docker image

2021-03-03 Thread Chesnay Schepler

they should be released in a day or two.

On 3/3/2021 11:18 PM, Bohinski, Kevin wrote:


Hi,

Are there plans to provide a docker image for 1.12.2?

Best

kevin





1.12.2 docker image

2021-03-03 Thread Bohinski, Kevin
Hi,

Are there plans to provide a docker image for 1.12.2?

Best
kevin


Re: Flink Zookeeper leader change v 1.9.X

2021-03-03 Thread Chesnay Schepler
1) This could occur due to a number of reasons, like processes crashing, 
network issues between ZK and Flink, or the JobManager being stuck in 
some blocking operation for a long time. You will need to take a look at 
the ZK/Flink logs to narrow things down.


2) For FLINK-14091 the issue was not just a ZK leader change but that 
the zookeeper connection was suspended, i.e, the connection broke down. 
I'd think the best way to replicate that is to shut down ZK for a bit, 
or make it otherwise unreachable. To trigger a plain leader change the 
easiest way would be to kill the leading JobManager.


On 3/3/2021 7:26 AM, Varun Chakravarthy Senthilnathan wrote:


Hi,

We are using flink version 1.9.1 and in a long-running environment, we 
encountered the specific issue mentioned in : 
https://issues.apache.org/jira/browse/FLINK-14091 



While we are working on upgrading our version,

 1. Why does zookeeper go for a leader change? As far as we checked,
there was not scaling in our cluster at all. The load was very
minimal. Is there any reason for the zookeeper leader change to
happen?
 2. is there a way to replicate the zookeeper leader change manually
to verify if the version upgrade helped us?

Regards,

Varun.





Re: Stop vs Cancel with savepoint

2021-03-03 Thread Chesnay Schepler

Your understanding of cancel vs stop(-with-savepoint) is correct.

I agree that we should update the REST API documentation and have a 
section outlining the problems with cancel-with-savepoint.

Would you like to open a ticket yourself?

On 3/3/2021 11:16 AM, Thomas Eckestad wrote:

Hi!

Cancel with savepoint is marked as deprecated in the cli-documentation. It is 
not marked as deprecated in the REST-API documentation though? Is that a 
mistake? At least some recommendation regarding stop vs cancel would be 
appropriate to include in the API doc, or?

As I understand, stop will cancel each operator in the job-DAG bottom-up in a 
gracefull manner. Conceptually meaning, first cancel the sources, then, when 
the operators directly downstream to the sources have drained all pending 
input, those will be canceled as well. This continues until the sinks are done 
as well. Or, maybe more to the point, the checkpoint barrier triggered for the 
savepoint will not be followed by any more input data, the sources will stop 
consuming new data untill the savepoint is complete and the job exits.

Is the above understanding correct? In that case, for some streaming jobs 
without exactly-once sinks, cancel with savepoint might trigger duplication. 
Which should be OK of course since the job needs to handle a restart anyway, 
but it might be beneficial to not generate duplicated output for this specific 
use case if there is a choice where the alternatives have the same cost 
implementation wise...

Is my understanding of cancel vs stop correct? If not what is the real 
practical difference between stop and cancel with savepoint?

To me it feels like cancel with save point should be deprecated in both the 
rest API and the cli and also there should be a text that explains why it is 
deprecated and why usage of it is discouraged, or?

Thanks,
Thomas
Thomas Eckestad
Systems Engineer
Road Perception

NIRA Dynamics AB
Wallenbergs gata 4
58330 Link?ping, Sweden
Mobile: +46  738 453 937
thomas.eckes...@niradynamics.se
www.niradynamics.se





Re: Unit Testing State Stores in KeyedProcessFunctions

2021-03-03 Thread Rion Williams
Thanks Chesnay,

I agree that output testing is more practical and far less brittle, I was just 
curious if support was there for it. I have a specific use case where I’m 
managing my own windows and may schedule something to be emitted but after some 
processing time delay so it could potentially be valuable to see this 
scheduling in state since it may not directly coincide with output.

Not a huge deal, I already have tests in place that function as black boxes 
with output verification, it was more of a question if it was supported.

Thanks much,

Rion

> On Mar 3, 2021, at 2:44 PM, Chesnay Schepler  wrote:
> 
> 
> I do not believe this to be possible.
> 
> Given that the state will likely in some form affect the behavior of the 
> function (usually in regards to what it outputs), it may be a better idea to 
> test for that. (I suppose you'd want tests like that anyway)
> 
> On 3/3/2021 8:10 PM, Rion Williams wrote:
>> Hi all!
>> 
>> Is it possible to apply assertions against the underlying state stores 
>> within a KeyedProcessFunction using the existing 
>> KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I 
>> wanted to ensure that if I passed in two elements each with unique keys that 
>> I would be able to query the underlying state stores to ensure they were 
>> working as expected. I don’t really see a mechanism that would support such 
>> behavior (i.e. give me the state store for key n from the operator?)
>> 
>> @Test
>> fun `Verify that instances with different keys retain separate watermarks`() 
>> {
>> // Arrange
>> val logs = listOf(
>> StreamRecord(TestGenerator.generateLog(tenant = "A")),
>> StreamRecord(TestGenerator.generateLog(tenant = "B")),
>> )
>> 
>> // Act
>> magicWindowHarness
>> .processElements(logs)
>> 
>> // Assert (I'd like to access the state by key for each here)
>> assert(magicWindowHarness.getStateForKey("A"), ...)
>> assert(magicWindowHarness.getStateForKey("B"), ...)
>> }
>> 
>> Is something like this possible or is there a better way to access the 
>> underlying state store? It seemed to work as expected when only a single key 
>> was involved, but when multiple keys were involved, things seemed to fall 
>> apart. The current testing documentation [0] is fantastic, however I think 
>> this might qualify as a more advanced task than it covered.
>> 
>> At present all of the state stores of the underlying function are privately 
>> declared, which may/may not be relevant:
>> 
>> @Transient private lateinit var watermark: ValueState
>> @Transient private lateinit var scheduledEvictions: MapState
>> 
>> Any recommendations or advice would be greatly appreciated and I'll be happy 
>> to provide any additional context/details as needed.
>> 
>> Thanks a lot!
>> 
>> Rion
>> 
>> [0]: 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> 


Re: Compile Error

2021-03-03 Thread Chesnay Schepler

The flink-clients dependency is correct.

We will need additional information to debug the Job execution failures, 
because these can happen due to all kind of reasons.

Things like the full stacktrace, or exceptions from the logging output.

Additionally, I would recommend to base your project on the quickstarts 
.


On 3/3/2021 4:55 PM, Abdullah bin Omar wrote:

Hi,

I am running a code (Example Program) from [1], and followed the [2] 
for the dependencies. I used this in the pom.xml:


http://maven.apache.org/POM/4.0.0 
"/xmlns:xsi=/"http://www.w3.org/2001/XMLSchema-instance 
"/xsi:schemaLocation=/"http://maven.apache.org/POM/4.0.0 
 
https://maven.apache.org/xsd/maven-4.0.0.xsd 
"/>


4.0.0

org.apache.flink 

newinput

0.0.1-SNAPSHOT





org.apache.flink

flink-streaming-java_2.11

1.12.0

provided








org.slf4j

slf4j-api

1.7.5





org.slf4j

slf4j-log4j12

1.7.5










I got an error, No Executor Factory Found

After that, I add flink-clients dependencies in the pom.xml



org.apache.flink

flink-clients_2.11

jar

1.12.0

provided




Then the previous error (No Executor Factory Found) was removed. 
However, a new bunch of error appeared (like Job execution failed)



[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/overview/ 

[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#project-configuration 




*/_Questions:_/*

(1) is the flink-clients dependencies correct?

(2) I just opened a maven project in Eclipse IDE, and am using the 
flink-1.12 version.I used the java code similar to Example Program 
[1], and using the above dependencies in pom.xml
I am wondering that are those steps correct? or something missing to 
install or add anything in the code/pom.xml?


Thank you

Best Regards,
Abdullah







Antw: [EXT] Re: Running Apache Flink on Android

2021-03-03 Thread Alexander Borgschulze
Hey,

Thanks for your answer :)
For my Master's thesis,I want to test and evaluate the use of CEP technologies
for detecting Complex Patterns in Android sensor data (Floating Phone Data).
Apache Flink offers a CEP library, so I thought it would be an interesting
option. The data sources would be the sensors (Gyroscope and Accelerometer).
Then I want to find patterns in this sensor data stream. This would be my
usecase. 
But I am struggling with runnning a minimum working example. The execution
outside of Android is not the problem. But I thought, that there might be a way
to run Flink (CEP) on Android




>>> Piotr Nowojski  03.03.21 21.22 Uhr >>>
Hi,
The question would be, why do you want to do it? I think it might be possible,
but probably nobody has ever tested it. Flink is a distributed system, so
running it on an Android phone doesn't make much sense.

I would suggest you first make your app/example work outside of Android. To
make sure that dependencies and project setup is correct. Keep in mind that you
also need to start somehow a Flink cluster. I would expect that starting a
minicluster with a local execution environment
(StreamExecutionEnvironment.createLocalEnvironment(...) instead of
StreamExecutionEnvironment.getExecutionEnvironment()) would be the way to go. 
Unless you want to run a distributed cluster across multiple Android phones,
but in that case I really don't know why you would like to do it :)

Also, Android has its own JDK, which we have never tested. It might cause some
problems.

Piotrek


wt., 2 mar 2021 o 16:23 Alexander Borgschulze
 napisał(a):


I was trying to run Apache Flink within an Android App. I just want to run a
minimum working example, like this:@Overrideprotected void onCreate(Bundle
savedInstanceState) {super.onCreate(savedInstanceState);   
setContentView(R.layout.activity_main);runFlinkExample();}private void
runFlinkExample() {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();DataStream
stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));stream.print();  
 try {env.execute();} catch (Exception e) {   
e.printStackTrace();}} 
These are my two .gradle files:


build.gradle (Module)

 
plugins {id 'com.android.application'}android {compileSdkVersion 30   
buildToolsVersion "30.0.3"defaultConfig {applicationId
"com.example.flink"minSdkVersion 26targetSdkVersion 30   
versionCode 1versionName "1.0"testInstrumentationRunner
"androidx.test.runner.AndroidJUnitRunner"}buildTypes {release {
   minifyEnabled falseproguardFiles
getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'  
 }}compileOptions {sourceCompatibility
JavaVersion.VERSION_1_8targetCompatibility JavaVersion.VERSION_1_8}
   packagingOptions {exclude 'META-INF/DEPENDENCIES'exclude
'reference.conf'}}dependencies {implementation
'androidx.appcompat:appcompat:1.2.0'implementation
'com.google.android.material:material:1.3.0'implementation
'androidx.constraintlayout:constraintlayout:2.0.4'testImplementation
'junit:junit:4.+'androidTestImplementation 'androidx.test.ext:junit:1.1.2' 
  androidTestImplementation 'androidx.test.espresso:espresso-core:3.3.0'//
Flinkimplementation 'org.apache.flink:flink-streaming-java_2.12:1.12.1'   
implementation 'org.apache.flink:flink-clients_2.12:1.12.1'} 
build.gradle (Project)

// Top-level build file where you can add configuration options common to all
sub-projects/modules.buildscript {repositories {google()   
jcenter()}dependencies {classpath
"com.android.tools.build:gradle:4.1.2"// NOTE: Do not place your
application dependencies here; they belong// in the individual module
build.gradle files}}allprojects {jcenter()}}task clean(type: Delete) {  
  delete rootProject.buildDir}
 
The first problem is, that I get the following Error:Caused by:
java.lang.ClassNotFoundException: Didn't find class
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" on
path: DexPathList[[zip file "/data/app/~~DbT_CZ7AhLED2xZgLBk 

 
In cases there this error doesn't appear, I get Akka-Actor errors, because I
must exclude 'reference.conf', otherwise the code wouldn't compile. However,
this leads to more exceptions, e.g. missing akka-version.
So my general question is: Is it possible to run Flink within an Android-App?
Or is this not possible (recommended)? Perhaps someone knows how to modfiy my
gradle files (or something else) to run my example. Or perhaps someone already
has successfully used Flink in Android.


 
 




Re: Unit Testing State Stores in KeyedProcessFunctions

2021-03-03 Thread Chesnay Schepler

I do not believe this to be possible.

Given that the state will likely in some form affect the behavior of the 
function (usually in regards to what it outputs), it may be a better 
idea to test for that. (I suppose you'd want tests like that anyway)


On 3/3/2021 8:10 PM, Rion Williams wrote:

Hi all!

Is it possible to apply assertions against the underlying state stores 
within a KeyedProcessFunction using the existing 
KeyedOneInputStreamOperatorTestHarness class within unit tests? 
Basically I wanted to ensure that if I passed in two elements each 
with unique keys that I would be able to query the underlying state 
stores to ensure they were working as expected. I don’t really see a 
mechanism that would support such behavior (i.e. give me the state 
store for key n from the operator?)


@Test
fun `Verify that instances with different keys retain separate 
watermarks`() {

    // Arrange
    val logs = listOf(
        StreamRecord(TestGenerator.generateLog(tenant = "A")),
        StreamRecord(TestGenerator.generateLog(tenant = "B")),
    )

    // Act
    magicWindowHarness
        .processElements(logs)

    // Assert (I'd like to access the state by key for each here)
    assert(magicWindowHarness.getStateForKey("A"), ...)
    assert(magicWindowHarness.getStateForKey("B"), ...)
}

Is something like this possible or is there a better way to access the 
underlying state store? It seemed to work as expected when only a 
single key was involved, but when multiple keys were involved, things 
seemed to fall apart. The current testing documentation [0] is 
fantastic, however I think this might qualify as a more advanced task 
than it covered.


At present all of the state stores of the underlying function are 
privately declared, which may/may not be relevant:


@Transient private lateinit var watermark: ValueState
@Transient private lateinit var scheduledEvictions: MapState

Any recommendations or advice would be greatly appreciated and I'll be 
happy to provide any additional context/details as needed.


Thanks a lot!

Rion

[0]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 






Re: Flink upgrade causes operator to lose state

2021-03-03 Thread Chesnay Schepler
It is currently not possible to upgrade table API / SQL applications via 
savepoints.


This thread may provide some more insights: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-and-checkpoints-and-savepoints-td40749.html


On 3/3/2021 6:53 PM, soumoks wrote:

Hi,

We are upgrading several applications from Flink 1.9.1 to 1.11.2.
Some of the applications written with Table API are not able start from
savepoint after the upgrade and fail with the following error.

Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint s3://0X/savepoint-9bd1c7-8cafa2c1a9ac. Cannot map
checkpoint/savepoint state for operator 49bb9e12f4a332535e9b828c1d4e2c0a to
the new program, because the operator is not available in the new program.
If you want to allow to skip this, you can set the --allowNonRestoredState
option on the CLI.
at
org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210)
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180)
at



Starting with --allowNonRestoredState option loses multiple state operators
and is not an option.


and a few other apps are failing with the following error.

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
2532
Serialization trace:
fieldValueMap (io.caseclass.samplecaseclass)

This case class consists of a Mutable.Map which seems to be causing the
issue.



And finally another app fails with the following error.


Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
KeyedProcessOperator_48c7355e6ee5ecb2411313ac3173573d_(1/1) from any of the
1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 more

Caused by: java.io.IOException: Could not find class
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
in classpath.
at
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:721)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)




>From the savepoint compatibility doc[1], restoring state across Flink 1.9.1
and 1.11.2 should be possible but it does not seem to be the case for the
above apps.

[1] -
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Running Apache Flink on Android

2021-03-03 Thread Piotr Nowojski
Hi,

The question would be, why do you want to do it? I think it might be
possible, but probably nobody has ever tested it. Flink is a distributed
system, so running it on an Android phone doesn't make much sense.

I would suggest you first make your app/example work outside of Android. To
make sure that dependencies and project setup is correct. Keep in mind that
you also need to start somehow a Flink cluster. I would expect that
starting a minicluster with a local execution environment
(StreamExecutionEnvironment.createLocalEnvironment(...) instead of
StreamExecutionEnvironment.getExecutionEnvironment()) would be the way to
go.  Unless you want to run a distributed cluster across multiple Android
phones, but in that case I really don't know why you would like to do it :)

Also, Android has its own JDK, which we have never tested. It might cause
some problems.

Piotrek

wt., 2 mar 2021 o 16:23 Alexander Borgschulze <
alexander.borgschu...@stud.uni-regensburg.de> napisał(a):

> I was trying to run Apache Flink within an Android App. I just want to run a 
> minimum working example, like this:
>
> @Override
> protected void onCreate(Bundle savedInstanceState) {
> super.onCreate(savedInstanceState);
> setContentView(R.layout.activity_main);
>
> runFlinkExample();
> }
>
> private void runFlinkExample() {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 
> 5));
> stream.print();
> try {
> env.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
>
> These are my two .gradle files:
>
>
> *build.gradle (Module)*
>
>
> plugins {
> id 'com.android.application'
> }
>
> android {
> compileSdkVersion 30
> buildToolsVersion "30.0.3"
>
> defaultConfig {
> applicationId "com.example.flink"
> minSdkVersion 26
> targetSdkVersion 30
> versionCode 1
> versionName "1.0"
>
> testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
> }
>
> buildTypes {
> release {
> minifyEnabled false
> proguardFiles 
> getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
> }
> }
> compileOptions {
> sourceCompatibility JavaVersion.VERSION_1_8
> targetCompatibility JavaVersion.VERSION_1_8
> }
>
> packagingOptions {
> exclude 'META-INF/DEPENDENCIES'
> exclude 'reference.conf'
> }
> }
>
> dependencies {
>
> implementation 'androidx.appcompat:appcompat:1.2.0'
> implementation 'com.google.android.material:material:1.3.0'
> implementation 'androidx.constraintlayout:constraintlayout:2.0.4'
> testImplementation 'junit:junit:4.+'
> androidTestImplementation 'androidx.test.ext:junit:1.1.2'
> androidTestImplementation 'androidx.test.espresso:espresso-core:3.3.0'
>
> // Flink
> implementation 'org.apache.flink:flink-streaming-java_2.12:1.12.1'
> implementation 'org.apache.flink:flink-clients_2.12:1.12.1'
> }
>
>
> *build.gradle (Project)*
>
> // Top-level build file where you can add configuration options common to all 
> sub-projects/modules.
> buildscript {
> repositories {
> google()
> jcenter()
> }
> dependencies {
> classpath "com.android.tools.build:gradle:4.1.2"
>
> // NOTE: Do not place your application dependencies here; they belong
> // in the individual module build.gradle files
> }
> }
>
> allprojects {
> repositories {
> google()
> jcenter()
> }
> }
>
> task clean(type: Delete) {
> delete rootProject.buildDir
> }
>
>
>
> The first problem is, that I get the following Error:
> Caused by: java.lang.ClassNotFoundException: Didn't find class 
> "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" on 
> path: DexPathList[[zip file "/data/app/~~DbT_CZ7AhLED2xZgLBk 
>
>
>
> In cases there this error doesn't appear, I get Akka-Actor errors, because
> I must exclude 'reference.conf', otherwise the code wouldn't compile.
> However, this leads to more exceptions, e.g. missing akka-version.
>
> So my general question is: Is it possible to run Flink within an
> Android-App? Or is this not possible (recommended)? Perhaps someone knows
> how to modfiy my gradle files (or something else) to run my example. Or
> perhaps someone already has successfully used Flink in Android.
>


Re: Flink KafkaProducer flushing on savepoints

2021-03-03 Thread Piotr Nowojski
Hi,

What Flink version and which FlinkKafkaProducer version are you using?
`FlinkKafkaProducerBase` is no longer used in the latest version. I would
guess some older versions, and FlinkKafkaProducer010 or later (no longer
supported).

I would suggest either to use the universal FlinkKafkaProducer (universal),
or FliknKafkaProducer011 (if you are using a really old Flink version that
doesn't have the universal Kafka connector). Both of those should work with
any Kafka version and by looking at the code it seems to me like neither of
those has the problem you mentioned. If you select
`org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic#AT_LEAST_ONCE`
and disable checkpointing it should be still flushing records on savepoints.

> The only thing I can think about is have checkpoints enabled with some
very high periodicity so that they are never(almost) triggered. But this is
a hack.

Yes, it would be a hack. But it would work.

Best,
Piotrek

wt., 2 mar 2021 o 12:09 Witzany, Tomas 
napisał(a):

> Hi,
> I have a question about the at-least-once guarantees for Kafka producers
> when checkpointing is disabled. In our data pipeline we have a Flink job on
> an unlimited stream that originally, we had checkpoints turned on. Further
> this job is cancelled with a savepoint once a day to do some data pre and
> post-processing for the next day, afterwards this job is restarted from the
> savepoint.
>
> The issue we have is that we want to turn off checkpointing, since it
> does not give us much value and only creates extra IO. When this is done this
> message
> 
> shows up:
> "Flushing on checkpoint is enabled, but checkpointing is not enabled.
> Disabling flushing."
> This prompted us to investigate, and it seems that if you have
> checkpointing disabled, there are no at-least-once guarantees.
> 
>
> What about if you have no checkpointing, but you make savepoints that you
> restore from yourself? Savepoints are the same thing as checkpoints in the
> code. The flink producer makes it impossible to turn on flushing and have
> checkpointing disabled. I can see why this is the case as there is some
> extra synchronization overhead related to the flushing flag being on. Is
> there a way to have checkpointing disabled and have at least once
> guarantees on savepoints?
>
> The only thing I can think about is have checkpoints enabled with some
> very high periodicity so that they are never(almost) triggered. But this is
> a hack.
>
> Tomas Witzany
>


Re: Job downgrade

2021-03-03 Thread Alexey Trenikhun
If I copy class A into version 1+ it works. But it is the problem from CD 
perspective - I want to introduce feature which required new state: 1st I need 
make version 1+ with class B, but no other changes, then version 2 with class B 
and logic changes, upgrade job and if job doesn’t do what expected “rollback” 
to version 1+.


From: Piotr Nowojski 
Sent: Wednesday, March 3, 2021 11:47:45 AM
To: Alexey Trenikhun 
Cc: Flink User Mail List 
Subject: Re: Job downgrade

Hi,

I'm not sure what's the reason behind this. Probably classes are somehow 
attached to the state and this would explain why you are experiencing this 
issue. I've asked someone else from the community to chip in, but in the 
meantime, can not you just prepare a new "version 1" of the job, with just some 
empty `class B` on the class path? Or if this doesn't work, just copy the whole 
`class B` from version 2?

Best,
Piotrek

sob., 27 lut 2021 o 19:10 Alexey Trenikhun 
mailto:yen...@msn.com>> napisał(a):
Hello,
Let's have version 1 of my job uses keyed state with name "a" and type A, which 
some Avro generated class. Then I upgrade to version 2, which in addition uses 
keyed state "b" and type B (another concrete Avro generated class), I take 
savepoint with version 2 and decided to downgrade to version 1 and start with 
taken savepoint, can I do it? On one hand, version 1 doesn't have state "b", 
but seems Flink still tries to create call restoreSerializer​ and it tries to 
read runtimeType (`class B`) which is not available in version 1

Thanks,
Alexey


Re: Job downgrade

2021-03-03 Thread Piotr Nowojski
Hi,

I'm not sure what's the reason behind this. Probably classes are somehow
attached to the state and this would explain why you are experiencing this
issue. I've asked someone else from the community to chip in, but in the
meantime, can not you just prepare a new "version 1" of the job, with just
some empty `class B` on the class path? Or if this doesn't work, just copy
the whole `class B` from version 2?

Best,
Piotrek

sob., 27 lut 2021 o 19:10 Alexey Trenikhun  napisał(a):

> Hello,
> Let's have version 1 of my job uses keyed state with name "a" and type A,
> which some Avro generated class. Then I upgrade to version 2, which in
> addition uses keyed state "b" and type B (another concrete Avro generated
> class), I take savepoint with version 2 and decided to downgrade to version
> 1 and start with taken savepoint, can I do it? On one hand, version 1
> doesn't have state "b", but seems Flink still tries to create call
> restoreSerializer​ and it tries to read runtimeType (`class B`) which is
> not available in version 1
>
> Thanks,
> Alexey
>


Re: how to propagate watermarks across multiple jobs

2021-03-03 Thread Piotr Nowojski
Hi,

Can not you write the watermark as a special event to the "mid-topic"? In
the "new job2" you would parse this event and use it to assign watermark
before `xxxWindow2`? I believe this is what FlinkKafkaShuffle is doing [1],
you could look at its code for inspiration.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.html

pon., 1 mar 2021 o 13:01 yidan zhao  napisał(a):

> I have a job which includes about 50+ tasks. I want to split it to
> multiple jobs, and the data is transferred through Kafka, but how about
> watermark?
>
> Is anyone have do something similar and solved this problem?
>
> Here I give an example:
> The original job: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
> xxxWindow2 resultSinkToKafka(result-topic).
>
> The new job1: kafkaStream1(src-topic) => xxxProcess => xxxWindow1 ==>
> resultSinkToKafka(mid-topic).
> The new job2: kafkaStream1(mid-topic) => xxxWindow2 ==>
> resultSinkToKafka(result-topic).
>
> The watermark for window1 and window 2 is separated to two jobs, which
> also seems to be working, but this introduces a 5-minute delay for window2
> (both window is 5min's cycle).
>
> The key problem is that the window's cycle is 5min, so the window2 will
> have a 5min's delay.
> If watermark can be transferred between jobs, it is not a problem anymore.
>
>


Re: Flink Metrics

2021-03-03 Thread Piotr Nowojski
Hi,

1)
Do you want to output those metrics as Flink metrics? Or output those
"metrics"/counters as values to some external system (like Kafka)? The
problem discussed in [1], was that the metrics (Counters) were not fitting
in memory, so David suggested to hold them on Flink's state and treat the
measured values as regular output of the job.

The former option you can think of if you had a single operator, that
consumes your CDCs outputs something (filtered CDCs? processed CDCs?) to
Kafka, while keeping some metrics that you can access via Flink metrics
system. The latter would be the same operator, but instead of single output
it would have multiple outputs, writing the "counters" also for example to
Kafka (or any other system of your choice). Both options are viable, each
has its own pros and cons.

2) You need to persist your metrics somewhere. Why don't you use Flink's
state for that purpose? Upon recovery/initialisation, you can get the
recovered value from state and update/set metric value to that recovered
value.

3) That seems to be a question a bit unrelated to Flink. Try searching
online how to calculate percentiles. I haven't thought about it, but
histograms or sorting all of the values seems to be the options. Probably
best if you would use some existing library to do that for you.

4) Could you rephrase your question?

Best,
Piotrek

niedz., 28 lut 2021 o 14:53 Prasanna kumar 
napisał(a):

> Hi flinksters,
>
> Scenario: We have cdc messages from our rdbms(various tables) flowing to
> Kafka.  Our flink job reads the CDC messages and creates events based on
> certain rules.
>
> I am using Prometheus  and grafana.
>
> Following are there metrics that i need to calculate
>
> A) Number of CDC messages wrt to each table.
> B) Number of events created wrt to each event type.
> C) Average/P99/P95 Latency (event created ts - ccd operation ts)
>
> For A and B, I created counters and able to see the metrices flowing into
> Prometheus . Few questions I have here.
>
> 1) How to create labels for counters in flink ? I did not find any easier
> method to do it . Right now I see that I need to create counters for each
> type of table and events . I referred to one of the community discussions.
> [1] . Is there any way apart from this ?
>
> 2) When the job gets restarted , the counters get back to 0 . How to
> prevent that and to get continuity.
>
> For C , I calculated latency in code for each event and assigned  it to
> histogram.  Few questions I have here.
>
> 3) I read in a few blogs [2] that histogram is the best way to get
> latencies. Is there any better idea?
>
> 4) How to create buckets for various ranges? I also read in a community
> email that flink implements  histogram as summaries.  I also should be able
> to see the latencies across timelines .
>
> [1]
> https://stackoverflow.com/questions/58456830/how-to-use-multiple-counters-in-flink
> [2] https://povilasv.me/prometheus-tracking-request-duration/
>
> Thanks,
> Prasanna.
>


Unit Testing State Stores in KeyedProcessFunctions

2021-03-03 Thread Rion Williams
Hi all!

Is it possible to apply assertions against the underlying state stores
within a KeyedProcessFunction using the existing
KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I
wanted to ensure that if I passed in two elements each with unique keys
that I would be able to query the underlying state stores to ensure they
were working as expected. I don’t really see a mechanism that would support
such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate
watermarks`() {
// Arrange
val logs = listOf(
StreamRecord(TestGenerator.generateLog(tenant = "A")),
StreamRecord(TestGenerator.generateLog(tenant = "B")),
)

// Act
magicWindowHarness
.processElements(logs)

// Assert (I'd like to access the state by key for each here)
assert(magicWindowHarness.getStateForKey("A"), ...)
assert(magicWindowHarness.getStateForKey("B"), ...)
}

Is something like this possible or is there a better way to access the
underlying state store? It seemed to work as expected when only a single
key was involved, but when multiple keys were involved, things seemed to
fall apart. The current testing documentation [0] is fantastic, however I
think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately
declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState
@Transient private lateinit var scheduledEvictions: MapState

Any recommendations or advice would be greatly appreciated and I'll be
happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html


Re: Independence of task parallelism

2021-03-03 Thread Piotr Nowojski
Hi Jan,

As far as I remember, Flink doesn't handle very well cases like (1-2-1-1-1)
and two Task Managers. There are no guarantees how the operators/subtasks
are going to be scheduled, but most likely it will be as you
mentioned/observed. First task manager will be handling all of the
operators, while the second task manager will only be running a single
instance of the second operator (for load balancing reasons it would be
better to spread the tasks across those two Task Managers more evenly).

No, Flink doesn't hold any resources (appart of network buffers) per task.
All of the available memory and CPU resources are shared across all of the
running tasks. So in the (1-2-1-1-1) case, if the first task manager will
be overloaded (for example if it has very few CPU cores), the second task
will perform much better on the second task manager (which will be empty),
causing a throughput skew. From this perspective, (2-2-2-2-2) would most
likely be performing better, as the load would be more evenly spread.

Piotrek

niedz., 28 lut 2021 o 13:10 Jan Nitschke  napisał(a):

> Hello,
>
> We are working on a project where we want to gather information about the
> job performance across different task level parallelism settings.
> Essentially, we want to see how the throughput of a single task varies
> across different parallelism settings, e.g. for a job of 5 tasks: 1-1-1-1-1
> vs. 1-2-1-1-1 vs. 2-2-2-2-2.
>
> *We are running flink on Kubernetes, a job with 5 tasks, slot sharing is
> enabled, operator chasing is disabled and each task manager has one slot.*
>
> So, the number of task managers is always the number of the highest
> parallelism and wen can fit the entire job into one task manager slot.
>
> We are then running the job against multiple parallelism configs (such as
> those above), collect the relevant metrics and try to get some useful
> information out of them.
>
> We are now wondering how independent our results are from one another.
> More specifically, if we now look at the parallelism of the second task, is
> its performance independent of the parallelism of the other tasks? So, will
> a the second task perform the same in (1-2-1-1-1) as in (2-2-2-2-2)?
>
> Our take on it is the following: With our setup, (1-2-1-1-1) should result
> in one task manager holding the entire job and a second task manager that
> only runs the second task. (2-2-2-2-2) will run two task managers with the
> entire job. So, theoretically, the second task should have much more
> resources available in the first setup as it has the entire resources of
> that task manager to its disposal. Does that assumption hold or will flink
> assign a certain amount of resources to a task in a task manager no matter
> how many other tasks are running on that same task manager slot?
>
> We would highly appreciate any help.
>
> Best,
> Jan
>


Re: Allocating tasks to specific TaskManagers

2021-03-03 Thread Piotr Nowojski
Hi Hyejo,

I don't think it's possible. May I ask why do you want to do this?

Best, Piotrek

pon., 1 mar 2021 o 21:02 황혜조  napisał(a):

> Hi,
>
> I am looking for a way to allocate each created subTask to a specific
> TaskManager.
> Is there any way to force assigning tasks to specific taskManagers?
>
> Thank you
>
> Best regards,
>
> Hyejo Hwang
>


Flink upgrade causes operator to lose state

2021-03-03 Thread soumoks
Hi,

We are upgrading several applications from Flink 1.9.1 to 1.11.2.
Some of the applications written with Table API are not able start from
savepoint after the upgrade and fail with the following error.

Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint s3://0X/savepoint-9bd1c7-8cafa2c1a9ac. Cannot map
checkpoint/savepoint state for operator 49bb9e12f4a332535e9b828c1d4e2c0a to
the new program, because the operator is not available in the new program.
If you want to allow to skip this, you can set the --allowNonRestoredState
option on the CLI.
at
org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:210)
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:180)
at 



Starting with --allowNonRestoredState option loses multiple state operators
and is not an option.


and a few other apps are failing with the following error.

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
2532
Serialization trace:
fieldValueMap (io.caseclass.samplecaseclass)

This case class consists of a Mutable.Map which seems to be causing the
issue.



And finally another app fails with the following error.


Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
KeyedProcessOperator_48c7355e6ee5ecb2411313ac3173573d_(1/1) from any of the
1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 more

Caused by: java.io.IOException: Could not find class
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
in classpath.
at
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:721)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)




>From the savepoint compatibility doc[1], restoring state across Flink 1.9.1
and 1.11.2 should be possible but it does not seem to be the case for the
above apps.

[1] -
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Compile Error

2021-03-03 Thread Abdullah bin Omar
Hi,

I am running a code (Example Program) from [1], and followed the [2] for
the dependencies. I used this in the pom.xml:

http://maven.apache.org/POM/4.0.0
"*
xmlns:xsi=*"http://www.w3.org/2001/XMLSchema-instance
"*
xsi:schemaLocation=*"http://maven.apache.org/POM/4.0.0

https://maven.apache.org/xsd/maven-4.0.0.xsd
"*>

  4.0.0

  org.apache.flink 

  newinput

  0.0.1-SNAPSHOT



  

  

org.apache.flink

  flink-streaming-java_2.11

  1.12.0

  provided








   org.slf4j

   slf4j-api

   1.7.5

   

   

   org.slf4j

   slf4j-log4j12

   1.7.5

   








I got an error, No Executor Factory Found

After that, I add flink-clients dependencies in the pom.xml



org.apache.flink

  flink-clients_2.11

  jar

  1.12.0

  provided




Then the previous error (No Executor Factory Found) was removed. However, a
new bunch of error appeared (like Job execution failed)

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/overview/
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#project-configuration


*Questions:*

(1) is the flink-clients dependencies correct?

(2) I just opened a maven project in Eclipse IDE, and am using the
flink-1.12 version.I used the java code similar to Example Program [1], and
using the above dependencies in pom.xml
I am wondering that are those steps correct? or something missing to
install or add anything in the code/pom.xml?

Thank you

Best Regards,
Abdullah


flink sql中如何使用异步io关联维表?

2021-03-03 Thread casel.chen
flink sql中如何使用异步io关联维表?官网文档有介绍么?

Re: Savepoint documentation

2021-03-03 Thread Farouk
Thanks a million :)

Le mer. 3 mars 2021 à 11:15, David Anderson  a écrit :

> > Out of curiosity, does it mean that savepoint created by flink 1.11
> cannot be recovered by a job running with flink 1.10 or older versions (so
> downgrade is impossible)?
>
> That's correct. See the mailing list thread on Backwards Compatibility of
> Savepoints [1].
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Document-Backwards-Compatibility-of-Savepoints-td41903.html
>
> On Tue, Mar 2, 2021 at 4:39 PM XU Qinghui 
> wrote:
>
>> Out of curiosity, does it mean that savepoint created by flink 1.11
>> cannot be recovered by a job running with flink 1.10 or older versions (so
>> downgrade is impossible)?
>>
>> Le mar. 2 mars 2021 à 12:25, David Anderson  a
>> écrit :
>>
>>> You are correct in thinking that the documentation wasn't updated. If
>>> you look at the master docs [1] you will see that they now say
>>>
>>> Can I move the Savepoint files on stable storage? #
>>> 
>>>
>>> The quick answer to this question is currently “yes”. Sink Flink 1.11.0,
>>> savepoints are self-contained and relocatable. You can move the file and
>>> restore from any location.
>>>
>>>
>>> If you want more detail than the quick answer, see [2].
>>>
>>> Best,
>>> David
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#can-i-move-the-savepoint-files-on-stable-storage
>>> [2] https://issues.apache.org/jira/browse/FLINK-19381
>>>
>>> On Tue, Mar 2, 2021 at 10:33 AM Farouk  wrote:
>>>
 Hi

 Does this chapter is outdated with Flink 1.11 ?


 https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage

 *Can I move the Savepoint files on stable storage?*

 *The quick answer to this question is currently “no” because the meta
 data file references the files on stable storage as absolute paths for
 technical reasons. The longer answer is: if you MUST move the files for
 some reason there are two potential approaches as workaround. First,
 simpler but potentially more dangerous, you can use an editor to find the
 old path in the meta data file and replace them with the new path. Second,
 you can use the class SavepointV2Serializer as starting point to
 programmatically read, manipulate, and rewrite the meta data file with the
 new paths.*


 

 Thanks
 Farouk

>>>


Re: Flink 1.12 Compatibility with hbase-shaded-client 2.1 in application jar

2021-03-03 Thread Debraj Manna
The issue is resolved. org.apache.hbase exclusion was missing on my
application pom while creating the uber jar.

diff --git a/map/engine/pom.xml b/map/engine/pom.xml
index 8337be031d1..8eceb721fa7 100644
--- a/map/engine/pom.xml
+++ b/map/engine/pom.xml
@@ -203,6 +203,7 @@
 org.slf4j:*
 log4j:*
 org.apache.hadoop:*
+org.apache.hbase:*
 
 
 

On Wed, Mar 3, 2021 at 10:12 AM Debraj Manna 
wrote:

> Hi
>
> I am trying to deploy an application in flink 1.12 having
> hbase-shaded-client 2.1.0 as dependency  in application mode
> .
> On deploying the application I am seeing the below ClassCastException:
>
> *org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto
> cannot be cast to
> org.apache.hadoop.hbase.shaded.com.google.protobuf.Message*
>
> *I have done *export HADOOP_CLASSPATH=`hadoop classpath` as mentioned in
> the hadoop documentation. I did not add any hadoop / hbase jars in the
> flink/lib folder .
>
> ubuntu@vrni-platform://tmp/debraj-flink/flink/lib$ ls
> flink-csv-1.12.1.jarflink-json-1.12.1.jar
>  flink-table_2.12-1.12.1.jarlog4j-1.2.17.jar
> slf4j-log4j12-1.7.25.jar
> flink-dist_2.12-1.12.1.jar  flink-shaded-zookeeper-3.4.14.jar
>  flink-table-blink_2.12-1.12.1.jar  log4j-to-slf4j-2.11.1.jar
>  vrni-flink-datadog-0.001-SNAPSHOT.jar
>
> Can anyone suggest what could be going wrong here?
>
> The full exception trace is like below
>
> 2021-03-02 18:10:45,819 ERROR 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager  - 
> Fatal error occurred in ResourceManager.
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Could not start the ResourceManager 
> akka.tcp://flink@localhost:41477/user/rpc/resourcemanager_0
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:233)
> at 
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:607)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:181)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at akka.actor.Actor.aroundReceive(Actor.scala:539)
> at akka.actor.Actor.aroundReceive$(Actor.scala:537)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:614)
> at akka.actor.ActorCell.invoke(ActorCell.scala:583)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
> at akka.dispatch.Mailbox.run(Mailbox.scala:229)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Cannot initialize resource provider.
> at 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:124)
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:245)
> at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:229)
> ... 22 more
> Caused by: 
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: 
> Could not start resource manager client.
> at 
> org.apache.flink.yarn.YarnResourceManagerDriver.initializeInternal(YarnResourceManagerDriver.java:181)
> at 
> org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:81)
> at 
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:122)
> ... 24 more
> 

Re:flink TableEnvironment.sqlUpdate不支持update 多表关联更新吗

2021-03-03 Thread Michael Ran






SQL 也不能这样吧- -











At 2021-03-03 16:43:49, "JackJia"  wrote:
>Hi 诸位同仁:
>诸同仁好,flink TableEnvironment.sqlUpdate是不是不支持update 多表关联更新?
>
>
>如下代码:
>bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " 
>+
>" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
>" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
>报错如下:
>Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
>parse failed. Encountered "," at line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
>at 
>org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
>at 
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
>at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
>at com.sir.BatchMain.main(BatchMain.java:17)
>Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
>line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
>at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
>at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
>at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
>at 
>org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
>... 4 more
>Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
>line 1, column 17.
>Was expecting:
>"SET" ...
>
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
>at 
>org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
>at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
>... 6 more


Re: flink-savepoint问题

2021-03-03 Thread yidan zhao
是不是使用了随机key。

guaishushu1...@163.com  于2021年3月3日周三 下午6:53写道:

> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> ... 5 more
>
>
> guaishushu1...@163.com
>


Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-03-03 Thread David Anderson
When bounded Flink sources reach the end of their input, a special
watermark with the value Watermark.MAX_WATERMARK is emitted that will take
care of flushing all windows.

One approach is to use a DeserializationSchema or
KafkaDeserializationSchema with an implementation of isEndOfStream that
returns true when the end of the input stream has been reached; something
like this, perhaps:

public class TestDeserializer extends YourKafkaDeserializer {
  public final static String END_APP_MARKER = "END_APP_MARKER"; // tests
send as last record

  @Override
  public boolean isEndOfStream(T nextElement) {
if (END_APP_MARKER.equals(nextElement.getRawData()))
  return true;

return false;
  }
}

Or with Flink 1.12, you could use the new KafkaSource with its setBounded
option.

Best,
David


On Mon, Mar 1, 2021 at 6:56 PM Rion Williams  wrote:

> Hey David et all,
>
> I had one follow up question for this as I've been putting together some
> integration/unit tests to verify that things are working as expected with
> finite datasets (e.g. a text file with several hundred records that are
> serialized, injected into Kafka, and processed through the pipeline). I'm
> wondering if there's a good strategy to handle these finite sets (i.e. when
> I'm done reading through all of the records that I care about, I'd need to
> trigger something to explicitly flush the windows / evict messages. I'm not
> sure what a great approach would be to handle here? I don't think there's
> an easy way to simulate processing time delays outside of an explicit
> Thread.sleep() call prior to injecting some messages into the running
> pipeline asynchronously.
>
> Any recommendations for handling something like this? I must imagine that
> it's a fairly common use-case for testing, but maybe not?
>
> Thanks much!
>
> Rion
>
> On Sat, Feb 27, 2021 at 10:56 AM Rion Williams 
> wrote:
>
>> Thanks David,
>>
>> I figured that the correct approach would obviously be to adopt a keying
>> strategy upstream to ensure the same data that I used as a key downstream
>> fell on the same partition (ensuring the ordering guarantees I’m looking
>> for).
>>
>> I’m guessing implementation-wise, when I would normally evict a window
>> after some event time and allowed lateness, I could set a timer or just
>> explicitly keep the window open for some additional time to allow for out
>> of order data to make its way into the window.
>>
>> Either way - I think the keying is probably the right approach, but I
>> wanted to consider any other options should that become an issue upstream.
>>
>> Thanks!
>>
>> Rion
>>
>> On Feb 27, 2021, at 10:21 AM, David Anderson 
>> wrote:
>>
>> 
>> Rion,
>>
>> If you can arrange for each tenant's events to be in only one kafka
>> partition, that should be the best way to simplify the processing you need
>> to do. Otherwise, a simple change that may help would be to increase the
>> bounded delay you use in calculating your own per-tenant watermarks,
>> thereby making late events less likely.
>>
>> David
>>
>> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams 
>> wrote:
>>
>>> David and Timo,
>>>
>>> Firstly, thank you both so much for your contributions and advice. I
>>> believe I’ve implemented things along the lines that you both detailed and
>>> things appear to work just as expected (e.g. I can see things arriving,
>>> being added to windows, discarding late records, and ultimately writing out
>>> files as expected).
>>>
>>> With that said, I have one question / issue that I’ve run into with
>>> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
>>> my key) may be distributed across the 10 partitions of my Kafka topic. With
>>> the way that I’m consuming from this topic (with a Kafka Consumer), it
>>> looks like my data is arriving in a mixed order which seems to be causing
>>> my own watermarks (those stored in my ValueState) to process as later data
>>> may arrive earlier than other data and cause my windows to be evicted.
>>>
>>> I’m currently using the `withNoWatermarks()` along with a custom
>>> timestamp assigned to handle all of my timestamping, but is there a
>>> mechanism to handle the mixed ordering across partitions in this scenario
>>> at the Flink level?
>>>
>>> I know the answer here likely lies with Kafka and adopting a better
>>> keying strategy to ensure the same tenant/source (my key) lands on the same
>>> partition, which by definition ensures ordering. I’m just wondering if
>>> there’s some mechanism to accomplish this post-reading from Kafka in Flink
>>> within my pipeline to handle things in a similar fashion?
>>>
>>> Again - thank you both so much, I’m loving the granularity and control
>>> that Flink has been providing me over other streaming technologies I’ve
>>> used in the past. I’m totally sold on it and am looking forward to doing
>>> more incredible things with it.
>>>
>>> Best regards,
>>>
>>> Rion
>>>
>>> On Feb 26, 2021, at 4:36 AM, David Anderson 
>>> wrote:
>>>
>>> 

flink-savepoint问题

2021-03-03 Thread guaishushu1...@163.com
checkpoint 可以成功保存,但是savepoint出现错误:
java.lang.Exception: Could not materialize checkpoint 2404 for operator 
KeyedProcess (21/48).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
Caused by: java.lang.IllegalArgumentException: Key group 0 is not in 
KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at 
org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
... 5 more


guaishushu1...@163.com


Stop vs Cancel with savepoint

2021-03-03 Thread Thomas Eckestad
Hi!

Cancel with savepoint is marked as deprecated in the cli-documentation. It is 
not marked as deprecated in the REST-API documentation though? Is that a 
mistake? At least some recommendation regarding stop vs cancel would be 
appropriate to include in the API doc, or?

As I understand, stop will cancel each operator in the job-DAG bottom-up in a 
gracefull manner. Conceptually meaning, first cancel the sources, then, when 
the operators directly downstream to the sources have drained all pending 
input, those will be canceled as well. This continues until the sinks are done 
as well. Or, maybe more to the point, the checkpoint barrier triggered for the 
savepoint will not be followed by any more input data, the sources will stop 
consuming new data untill the savepoint is complete and the job exits.

Is the above understanding correct? In that case, for some streaming jobs 
without exactly-once sinks, cancel with savepoint might trigger duplication. 
Which should be OK of course since the job needs to handle a restart anyway, 
but it might be beneficial to not generate duplicated output for this specific 
use case if there is a choice where the alternatives have the same cost 
implementation wise...

Is my understanding of cancel vs stop correct? If not what is the real 
practical difference between stop and cancel with savepoint?

To me it feels like cancel with save point should be deprecated in both the 
rest API and the cli and also there should be a text that explains why it is 
deprecated and why usage of it is discouraged, or?

Thanks,
Thomas
Thomas Eckestad
Systems Engineer
Road Perception

NIRA Dynamics AB
Wallenbergs gata 4
58330 Link?ping, Sweden
Mobile: +46  738 453 937
thomas.eckes...@niradynamics.se
www.niradynamics.se



Re: Savepoint documentation

2021-03-03 Thread David Anderson
> Out of curiosity, does it mean that savepoint created by flink 1.11
cannot be recovered by a job running with flink 1.10 or older versions (so
downgrade is impossible)?

That's correct. See the mailing list thread on Backwards Compatibility of
Savepoints [1].

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Document-Backwards-Compatibility-of-Savepoints-td41903.html

On Tue, Mar 2, 2021 at 4:39 PM XU Qinghui 
wrote:

> Out of curiosity, does it mean that savepoint created by flink 1.11 cannot
> be recovered by a job running with flink 1.10 or older versions (so
> downgrade is impossible)?
>
> Le mar. 2 mars 2021 à 12:25, David Anderson  a
> écrit :
>
>> You are correct in thinking that the documentation wasn't updated. If you
>> look at the master docs [1] you will see that they now say
>>
>> Can I move the Savepoint files on stable storage? #
>> 
>>
>> The quick answer to this question is currently “yes”. Sink Flink 1.11.0,
>> savepoints are self-contained and relocatable. You can move the file and
>> restore from any location.
>>
>>
>> If you want more detail than the quick answer, see [2].
>>
>> Best,
>> David
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#can-i-move-the-savepoint-files-on-stable-storage
>> [2] https://issues.apache.org/jira/browse/FLINK-19381
>>
>> On Tue, Mar 2, 2021 at 10:33 AM Farouk  wrote:
>>
>>> Hi
>>>
>>> Does this chapter is outdated with Flink 1.11 ?
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage
>>>
>>> *Can I move the Savepoint files on stable storage?*
>>>
>>> *The quick answer to this question is currently “no” because the meta
>>> data file references the files on stable storage as absolute paths for
>>> technical reasons. The longer answer is: if you MUST move the files for
>>> some reason there are two potential approaches as workaround. First,
>>> simpler but potentially more dangerous, you can use an editor to find the
>>> old path in the meta data file and replace them with the new path. Second,
>>> you can use the class SavepointV2Serializer as starting point to
>>> programmatically read, manipulate, and rewrite the meta data file with the
>>> new paths.*
>>>
>>>
>>> 
>>>
>>> Thanks
>>> Farouk
>>>
>>


Re: BroadcastState dropped when data deleted in Kafka

2021-03-03 Thread bat man
I created a new descriptor and rulestream used it in the second process
function and this works fine.

public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

public static final MapStateDescriptor rulesDescriptor2 =

new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));


BroadcastStream rulesStream =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor);

BroadcastStream rulesStream2 =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor2);


SingleOutputStreamOperator>
keyedSingleOutputStream =
rawEventStream.
connect(rulesStream).
process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

SingleOutputStreamOperator rtEventDataStream =
keyedSingleOutputStream.
keyBy((keyed) -> keyed.getKey()).
connect(rulesStream2).
process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Fri, Feb 26, 2021 at 3:38 PM Arvid Heise  wrote:

> Hi,
>
> I have no idea what's going on. There is no mechanism in DataStream to
> react to deleted records.
>
> Can you reproduce it locally and debug through it?
>
>
>
> On Wed, Feb 24, 2021 at 5:21 PM bat man  wrote:
>
>> Hi Arvid,
>>
>> The Flink application was not re-started. I had checked on that.
>> By adding rules to the state of process function you mean the state which
>> is local to the keyedprocess function?
>> From [1] what is being done here -
>>
>> final MapState> state = getRuntimeContext().
>> getMapState(mapStateDesc);
>>
>> state.put(ruleName, stored);
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>
>> Thanks.
>>
>>
>> On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:
>>
>>> Could you double-check if your Flink application was restarted between
>>> Kafka topic was cleared and the time you saw that the rules have been lost?
>>>
>>> I suspect that you deleted the Kafka topic and the Flink application
>>> then failed and restarted. Upon restart it read the empty rule topic.
>>>
>>> To solve it, you probably want to add the rules to the state of your
>>> process function [1]. If you have done that, I'm a bit lost.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>
>>> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>>>
 Hi,

 This is my code below -
 As mentioned earlier the rulesStream us again used in later processing.
 Below you can see the rulesStream is again connected with the result stream
 of the first process stream. Do you think this is the reason rules
 operators state getting overridden when the data in kafka is deleted?
 My question is if the data is not present in kafka then no data is read
 in stream how it is updating the existing state data.

 public static final MapStateDescriptor rulesDescriptor =
 new MapStateDescriptor<>(
 "rules", BasicTypeInfo.INT_TYPE_INFO, 
 TypeInformation.of(Rule.class));

 KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
 DataStream rawEventStream = 
 validateData(getRawEventStream(rawEventKafkaSource,env));

  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);

  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
  DataStream deviceDataStream = getDeviceStream(deviceSource,env);

  BroadcastStream rulesStream = 
 rulesDataStream.broadcast(rulesDescriptor);

  SingleOutputStreamOperator> 
 keyedSingleOutputStream =
  rawEventStream.
  connect(rulesStream).
  process(new 
 DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

  SingleOutputStreamOperator rtEventDataStream =
  keyedSingleOutputStream.
  keyBy((keyed) -> keyed.getKey()).
  connect(rulesStream).
  process(new 
 DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


 On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
 khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Deletion of messages in Kafka shouldn't affect Flink state in general.
> Probably, some operator in your pipeline is re-reading the topic
> and overwrites the state, dropping what was deleted by Kafka.
> Could you share the code?
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>
>> Hi,
>>
>> I have 2 streams one event data and the other rules. I broadcast the
>> rules stream and then 

Re: Processing-time temporal join is not supported yet.

2021-03-03 Thread eric hoffmann
Hi Leonard,
Thx for your reply,
Not problem to help on the JIRA topic,
In my situation, in a full sql env, what will be the best workaround to
enrich stream of data from a kafka topics with statical data based on id?
i know how to do t in stream.
eric

Le sam. 27 févr. 2021 à 05:15, Leonard Xu  a écrit :

> Hi, Eric
>
> Firstly FileSystemTableSource doe not implement LookupTableSource which
> means we cannot directly lookup a Filesystem table.
>
> In FLINK-19830, we plan to support Processing-time temporal join any
> table/views by lookup the data in join operator state which scanned from
> the filesystem table, but as the issue described: join processing for
> left stream doesn't wait for the complete snapshot of temporal table, this
> may mislead users in production environment.
> Eg: your s3 table has 1000 records, but the join operator does not know
> when all records has been arrived, the correlation maybe incorrect, thus we
> disable this feature.
>
> I think we can  implement LookupTableSource for  FileSystemTableSource
> currently, after that, we can directly lookup a Filesystem table, the
> implementation will be similar to Hive table where we cache all data of the
> files and then lookup the cache.  Could you help create an JIRA ticket for
> this?
>
>
> Best,
> Leonard
>
>
> 在 2021年2月26日,23:41,Matthias Pohl  写道:
>
> Hi Eric,
> it looks like you ran into FLINK-19830 [1]. I'm gonna add Leonard to the
> thread. Maybe, he has a workaround for your case.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-19830
>
> On Fri, Feb 26, 2021 at 11:40 AM eric hoffmann 
> wrote:
>
>> Hello
>> Working with flink 1.12.1 i read in the doc that Processing-time temporal
>> join is supported for kv like join but when i try i get a:
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Processing-time temporal join is not supported yet.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlan(StreamExecTemporalJoin.scala:56)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:66)
>>
>> my query:
>>
>> SELECT e.id
>> , r.test FROM kafkaTable as e JOIN s3Table FOR SYSTEM_TIME AS OF e.proctime 
>> AS r ON
>> e.id = r.id
>>
>> my s3 table:
>>
>> CREATE TABLE s3Table(id STRING,
>> test STRING, PRIMARY KEY (id) NOT ENFORCED)
>>   WITH ('connector'='filesystem','path'='s3a://fs/','format'='json')
>>
>> my kafka table:
>>
>> CREATE TABLE kafkaTable(id STRING, foo STRING, bar BIGINT,
>> proctime AS PROCTIME())
>>
>>   WITH 
>> ('connector'='kafka','topic'='mytopic','properties.bootstrap.servers'='
>> 127.0.0.1:9092','properties.group.id
>> '='mygroup','format'='json','scan.startup.mode'='group-offsets', 
>> 'properties.enable.auto.commit'='false')
>>
>>
>
>


Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-03 Thread Shuiqiang Chen
Hi Kevin,

Thank you for your questions. Currently, users are not able to defined
custom source/sinks in Python. This is a greate feature that can unify the
end to end PyFlink application development in Python and is a large topic
that we have no plan to support at present.

As you have noticed that `the Python DataStream API has several connectors
[2] that use Py4J+Java gateways to interoperate with Java source/sinks`.
These connectors are the extensions of the Python abstract class named
`SourceFunction` and `SinkFunction`. Thess two classes can accept a Java
source/sink instance and maintain it to enable the interoperation between
Python and Java.  They can also accept a string of the full name of a
Java/Scala defined Source/SinkFunction class and create the corresponding
java instance. Bellow are the definition of these classes:

class JavaFunctionWrapper(object):
"""
A wrapper class that maintains a Function implemented in Java.
"""

def __init__(self, j_function: Union[str, JavaObject]):
# TODO we should move this part to the get_java_function() to
perform a lazy load.
if isinstance(j_function, str):
j_func_class = get_gateway().jvm.__getattr__(j_function)
j_function = j_func_class()
self._j_function = j_function

def get_java_function(self):
return self._j_function



class SourceFunction(JavaFunctionWrapper):
"""
Base class for all stream data source in Flink.
"""

def __init__(self, source_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param source_func: The java SourceFunction object.
"""
super(SourceFunction, self).__init__(source_func)


class SinkFunction(JavaFunctionWrapper):
"""
The base class for SinkFunctions.
"""

def __init__(self, sink_func: Union[str, JavaObject]):
"""
Constructor of SinkFunction.

:param sink_func: The java SinkFunction object or the full name of the
SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)

Therefore, you are able to defined custom sources/sinks in Scala and apply
them in Python. Here is the recommended approach for implementation:

class MyBigTableSink(SinkFunction):
def __init__(self, class_name: str):
super(MyBigTableSink, self).__init__(class_name)


def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars('/the/path/of/your/MyBigTableSink.jar')
# ...
ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
env.execute("Application with Custom Sink")


if __name__ == '__main__':
example()

Remember that you must add the jar of the Scala defined SinkFunction by
calling `env.add_jars()` before adding the SinkFunction. And your custom
sources/sinks function must be the extension of `SourceFunction` and
`SinkFunction`.

Any further questions are welcomed!

Best,
Shuiqiang


Kevin Lam  于2021年3月3日周三 上午2:50写道:

> Hello everyone,
>
> I have some questions about the Python API that hopefully folks in the
> Apache Flink community can help with.
>
> A little background, I’m interested in using the Python Datastream API
> because of stakeholders who don’t have a background in Scala/Java, and
> would prefer Python if possible. Our team is open to maintaining Scala
> constructs on our end, however we are looking to expose Flink for stateful
> streaming via a Python API to end-users.
>
> Questions:
>
> 1/ The docs mention that custom Sources and Sinks cannot be defined in
> Python, but must be written in Java/Scala [1]. What is the recommended
> approach for interoperating between custom sinks/sources written in Scala,
> with the Python API? If nothing is currently supported, is it on the road
> map?
>
> 2/ Also, I’ve noted that the Python DataStream API has several connectors
> [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
> there a way for users to build their own connectors? What would this
> process entail?
>
> Ideally, we’d like to be able to define custom sources/sinks in Scala and
> use them in our Python API Flink Applications. For example, defining a
> BigTable sink in Scala for use in the Python API:
>
>
> [3]
>
> Where MyBigTableSink is just somehow importing a Scala defined sink.
>
> More generally, we’re interested in learning more about Scala/Python
> interoperability in Flink, and how we can expose the power of Flink’s Scala
> APIs to Python. Open to any suggestions, strategies, etc.
>
> Looking forward to any thoughts!
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks
>
> [2]
> https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py
>
> [3] Plaintext paste of code in screenshot, in case of attachment issues:
> ```
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import 

flink TableEnvironment.sqlUpdate??????update ??????????????

2021-03-03 Thread JackJia
Hi ??
??flink TableEnvironment.sqlUpdateupdate ??


??
bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " +
" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
??
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "," at line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
at com.sir.BatchMain.main(BatchMain.java:17)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
... 4 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
... 6 more

Re: Flink application kept restarting

2021-03-03 Thread Rainie Li
I see.
Thank you for the explanation.

Best regards
Rainie

On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl 
wrote:

> Hi Rainie,
> in general buffer pools being destroyed usually mean that some other
> exception occurred that caused the task to fail and in the process of
> failure handling the operator-related network buffer is destroyed. That
> causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
> case. It looks like you had some timeout problem while fetching data from a
> Kafka topic.
>
> Matthias
>
> On Tue, Mar 2, 2021 at 10:39 AM Rainie Li  wrote:
>
>> Thanks for checking, Matthias.
>>
>> I have another flink job which failed last weekend with the same buffer
>> pool destroyed error. This job is also running version 1.9.
>> Here is the error I found from the task manager log. Any suggestion what
>> is the root cause and how to fix it?
>>
>> 2021-02-28 00:54:45,943 WARN
>>  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
>> while canceling task.
>> java.lang.RuntimeException: Buffer pool is destroyed.
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>> at
>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>> at
>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> 

??????????Flink1.11??flink-runtime-web????

2021-03-03 Thread Natasha
hi Michael,
  ??




----
??: 
   "user-zh"



flink TableEnvironment.sqlUpdate??????update ??????????????

2021-03-03 Thread JackJia
Hi ??
??flink TableEnvironment.sqlUpdateupdate ??


??
bbTableEnv.sqlUpdate("update order_tb a, min_max_usertime_tb b set a.mark=-2 " +
" where a.mac=b.mac and extract(epoch from a.usertime)/7200 = b.user_2hour " +
" and a.usertime > b.min_usertime and a.usertime < b.max_usertime");
??
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "," at line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:96)
at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:127)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
at com.sir.idle.IdleAnalysis.runBlinkBatch(IdleAnalysis.java:101)
at com.sir.BatchMain.main(BatchMain.java:17)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:368)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:167)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:147)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:162)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:187)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:92)
... 4 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "," at 
line 1, column 17.
Was expecting:
"SET" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:33107)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:32921)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlUpdate(FlinkSqlParserImpl.java:8227)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3646)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3669)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:214)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:160)
... 6 more

Re: Flink application kept restarting

2021-03-03 Thread Matthias Pohl
Hi Rainie,
in general buffer pools being destroyed usually mean that some other
exception occurred that caused the task to fail and in the process of
failure handling the operator-related network buffer is destroyed. That
causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your
case. It looks like you had some timeout problem while fetching data from a
Kafka topic.

Matthias

On Tue, Mar 2, 2021 at 10:39 AM Rainie Li  wrote:

> Thanks for checking, Matthias.
>
> I have another flink job which failed last weekend with the same buffer
> pool destroyed error. This job is also running version 1.9.
> Here is the error I found from the task manager log. Any suggestion what
> is the root cause and how to fix it?
>
> 2021-02-28 00:54:45,943 WARN
>  org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> while canceling task.
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> --
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
>