Re: 退订

2021-09-09 Thread Caizhi Weng
Hi!

退订中文邮件列表请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org,其他邮件列表退订邮箱参见
https://flink.apache.org/community.html#mailing-lists

lutty shi  于2021年9月10日周五 下午1:37写道:

> 退订
> 
> 
> 
> 
> 
> 
> 
>  发自 
>  target='_blank' style='text-decoration: none; color: #12'>网易邮箱大师
> 
>
> 
> 


退订

2021-09-09 Thread lutty shi
退订





 

 发自 
网易邮箱大师





Re: Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty

2021-09-09 Thread Caizhi Weng
Hi!

感谢持续反馈。这次确实在开发环境下复现了这个问题。

这是因为目前 event time temporal join 对 join key 的处理还不够完善,目前只能处理原原本本的输入列(比如直接用
uuid),暂时不能处理利用输入列做的运算(例如这里从 map 里取值)。我已经开了一个 issue 记录这个问题 [1]。一个绕过的方法是先在一个
view 里给 map 取值,比如这样:

CREATE TABLE A (
a MAP,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts
);

CREATE TABLE B (
id INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts,
PRIMARY KEY (id) NOT ENFORCED
);

CREATE VIEW myView AS SELECT A.a['ID'] AS id, ts FROM A;

SELECT * FROM myView AS a LEFT JOIN B FOR SYSTEM_TIME AS OF a.ts AS b ON
a.id = b.id;

后面那个 Temporal Table Join requires primary key in versioned table, but no
primary key can be found. 是说 event time temporal join 的维表必须定义 primary
key。这是预期行为,参见 [1] 中的 The event-time temporal join requires the primary key
contained in the equivalence condition of the temporal join condition.
Primary key 的定义方式见上述示例代码,也可以参见 [2]。

[1] https://issues.apache.org/jira/browse/FLINK-24239
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
[3]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/sql/create/#primary-key

Wayne <1...@163.com> 于2021年9月9日周四 下午11:29写道:

> Hello
>
>
> 打扰了我最近再次尝试,还是报这个错误
> 我的flink版本为 flink-1.12.2-bin-scala_2.12
> 使用sql client执行
> 我的sql 如下
>
>
> CREATE TABLE stub_trans (
> `uuid` STRING,
> `columnInfos` MAP STRING  ,name STRING NOT NULL ,isKeyColumn BOOLEAN NOT NULL,type STRING NOT
> NULL > NOT NULL>  NOT NULL,
> procTime TIMESTAMP(3) METADATA FROM 'timestamp' ,
> WATERMARK FOR procTime AS procTime
> ) WITH (
> 'connector' = 'kafka',
> ..
> 'format' = 'avro'
> );
>
>
> CREATE TABLE kapeng_test (
>
>
> `event_id` STRING,
> `genre_id` STRING,
> `user_guid` STRING,
> procTime TIMESTAMP(3) METADATA FROM 'timestamp'  ,
> WATERMARK FOR procTime AS procTime
> ) WITH (
> 'connector' = 'kafka',
> 
> 'format' = 'avro'
> );
>
>
>
>
> CREATE TABLE purchase_hist (
> rowkey STRING,
> d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING,
> genreid STRING, quantity STRING >,
> PRIMARY KEY ( rowkey ) NOT ENFORCED
> ) WITH (
> 'connector' = 'hbase-1.4',
> .);
>
>
>
>
> INSERT INTO purchase_hist SELECT
> rowkey,
> ROW ( cost, crdate, currency, eventid, genreid, quantity )
> FROM
> ( SELECT
> CONCAT_WS(
> '_',
> user_guid,
> CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
> '-MM-dd:HH:mm:ss.S' ) AS BIGINT ) AS STRING )) AS rowkey,
> columnInfos [ 'TICKET_COST' ].newValue AS cost,
> DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue,
> '-MM-dd:HH:mm:ss.S' ), '-MM-dd' ) AS crdate,
> columnInfos [ 'EVENT_ID' ].newValue AS eventid,
> columnInfos [ 'CURRENCY_CODE' ].newValue AS currency,
> columnInfos [ 'QUANTITY' ].newValue AS quantity,
> genre_id AS genreid,
> user_guid AS userGuid
> FROM
> stub_trans
>LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON
> stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m
>
>
>
>
> 报错如下
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Currently the join key in Temporal Table Join can not be empty.
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
> at 

Re: Usecase for flink

2021-09-09 Thread JING ZHANG
Hi Dipanjan,
Base your description, I think Flink could handle this user case.
Don't worry that Flink can't handle this kind of data scale because Flink
is a distributed engine. As long as the problem of data skew is carefully
avoided, the input throughput can be handled through appropriate resources.

Best,
JING ZHANG

Dipanjan Mazumder  于2021年9月10日周五 上午11:11写道:

> Hi,
>
>I am working on a usecase and thinking of using flink for the same.
> The use case is i will be having many large resource graphs , i need to
> parse that graph for each node and edge and evaluate each one of them
> against some suddhi rules , right now the implementation for evaluating
> individual entities with flink and siddhi are in place , but i am in
> dilemma whether i should do the graph processing as well in flink or not.
> So this is what i am planning to do
>
> From kafka will fetch the graph , decompose the graph into nodes and edges
> , fetch additional meradata for each node and edge from different Rest
> API’s and then pass the individual nodes and edges which are resources to
> different substreams which are already inplace and rules will work on
> individual substreams to process individual nodes and edges and finally
> they will spit the rule output into a stream. I will collate all of them
> based on the graph id from that stream using another operator and send the
> final result to an outputstream.
>
> This is what i am thinking , now need input from all of you whether this
> is a fair usecase to do with flink , will flink be able to handle this
> level of processing at scale and volume or not.
>
> Any help input will ease my understanding and will help me go ahead with
> this idea.
>
> Regard
> dipanjan
>


Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Yun Tang
Hi David,

I think Seth had shared some useful information.

If you want to know what happened within RocksDB when you're reading, you can 
leverage async-profiler [1] to catch the RocksDB stacks and I guess that index 
block might be evicted too frequently during your read. And we could use new 
read option which disable fillCache [2] to speedup bulk scan in the future to 
help improve the performance.


Best
Yun Tang

[1] https://github.com/jvm-profiling-tools/async-profiler
[2] 
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/ReadOptions.html#setFillCache(boolean)

From: Seth Wiesman 
Sent: Friday, September 10, 2021 0:58
To: David Causse ; user 
Cc: Piotr Nowojski 
Subject: Re: State processor API very slow reading a keyed state with RocksDB

Hi David,

I was also able to reproduce the behavior, but was able to get significant 
performance improvements by reducing the number of slots on each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime 
execution of DataSet over DataStream. In particular, Flink's DataStream 
operators are aware of the resource requirements of the state backend and 
include RocksDB in its internal memory configurations. In the state processor 
api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances within 
the same JVM, you are actually running a single native process with multiple 
logical instances. I _think_ we are seeing contention amongst the logical 
RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to continue 
investigating. If my suspicion for the slowness is correct, we will need to 
migrate to the new Source API and improve this as part of DataStream 
integration. This migration is something we'd like to do regardless, but I 
don't have a timeline to share.

Aside: Why is writing still relatively fast?

Even with these factors accounted for, I do still expect writing to be faster 
than reading. This is due to both how RocksDB internal data structures work, 
along with some peculiarities of how to state processor API has to perform 
reads.

1. RocksDB internally uses a data structure called a log structured merge tree 
(or LSM). This means writes are always implemented as appends, so there is no 
seek required. Additionally, writes go into an in-memory data structure called 
a MemTable that is flushed to disk asynchronously.  Because there may be 
multiple entries for a given key, RocksDB needs to search for the most recent 
value and potentially read from disk. This may be alleviated by enabling bloom 
filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state 
(ValueState, ListState, etc) as its own column family (table). A key only 
exists in a table if it has a non-null value. This means not all keys exist in 
all column families for a given operator. The state-proc-api wants to make it 
appear as if each operator is composed of a single table with multiple columns. 
To do this, we perform a full table scan on one column family and then do point 
lookups of that key on the others. However, we still need to find the keys that 
may only exist in other tables. The trick we perform is to delete keys from 
rocksDB after each read, so we can do full table scans on all column families 
but never see any duplicates. This means the reader is performing multiple 
reads and writes on every call to `readKey` and is more expensive than it may 
appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried 
profiling/flame graphs and I was not able to make much sense out of those 
results. There are no IO/Memory bottlenecks that I could notice, it looks 
indeed like the Job is stuck inside RocksDB itself. This might be an issue with 
for example memory configuration. Streaming jobs and State Processor API are 
running in very different environments as the latter one is using DataSet API 
under the hood, so maybe that can explain this? However I'm no expert in 
neither DataSet API nor the RocksDB, so it's hard for me to make progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse 
mailto:dcau...@wikimedia.org>> napisał(a):
Hi,

I'm investigating why a job we use to inspect a flink state is a lot slower 
than the bootstrap job used to generate it.

I use RocksdbDB with a simple keyed value state mapping a string key to a long 
value. Generating the bootstrap state from a CSV file with 100M entries takes a 
couple minutes over 12 slots spread over 3 TM (4Gb allowed). But another job 
that does the opposite (converts this state into a CSV file) takes several 
hours. I would have expected these two job runtimes to be in the same ballpark.

I wrote a simple test case[1] to reproduce the problem. This 

Usecase for flink

2021-09-09 Thread Dipanjan Mazumder
Hi,
   I am working on a usecase and thinking of using flink for the same. The use 
case is i will be having many large resource graphs , i need to parse that 
graph for each node and edge and evaluate each one of them against some suddhi 
rules , right now the implementation for evaluating individual entities with 
flink and siddhi are in place , but i am in dilemma whether i should do the 
graph processing as well in flink or not.So this is what i am planning to do
>From kafka will fetch the graph , decompose the graph into nodes and edges , 
>fetch additional meradata for each node and edge from different Rest API’s and 
>then pass the individual nodes and edges which are resources to different 
>substreams which are already inplace and rules will work on individual 
>substreams to process individual nodes and edges and finally they will spit 
>the rule output into a stream. I will collate all of them based on the graph 
>id from that stream using another operator and send the final result to an 
>outputstream.
This is what i am thinking , now need input from all of you whether this is a 
fair usecase to do with flink , will flink be able to handle this level of 
processing at scale and volume or not.
Any help input will ease my understanding and will help me go ahead with this 
idea.
Regarddipanjan

Re: flink cdc SQL2ES,GC overhead limit exceeded

2021-09-09 Thread Caizhi Weng
Hi!

是否尝试调大过堆内存呢?调大后还是如此吗?方便的话能否提供一下 SQL 文本?

LI YUAN <27297...@qq.com.invalid> 于2021年9月10日周五 上午9:39写道:

> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at org.rocksdb.RocksIterator.key0(Native Method)
> at org.rocksdb.RocksIterator.key(RocksIterator.java:37)
> at
> org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670)
> at
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285)
> at
> org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129)
> at org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
> at org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown
> Source)
> at org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:748)
>
> Environment :
>
> Flink version : 1.13.1
>
> Flink CDC version: 1.4.0
>
> Database and version: Mysql 7.0


Flink rest接口返回数据,以及ui展示数据异常。

2021-09-09 Thread yidan zhao
如题,目前我主要考察了watermark,我的window正常能输出,但是reset接口以及ui上展示的watermark明显严重延迟。

但实际watermark肯定正常,因为窗口正常触发。

这个代码未改动过,以前正常。  而且,目前感觉从浏览器network监控来看,返回的数据貌似不变一直。


flink cdc SQL2ES??GC overhead limit exceeded

2021-09-09 Thread LI YUAN
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.rocksdb.RocksIterator.key0(Native Method)
at org.rocksdb.RocksIterator.key(RocksIterator.java:37)
at 
org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.key(RocksIteratorWrapper.java:99)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.loadCache(RocksDBMapState.java:670)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapIterator.hasNext(RocksDBMapState.java:585)
at 
org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateViews$InputSideHasNoUniqueKey$1.hasNext(OuterJoinRecordStateViews.java:285)
at 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator$AssociatedRecords.of(AbstractStreamingJoinOperator.java:199)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:211)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement2(StreamingJoinOperator.java:129)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:221)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:190)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$$Lambda$363/366743523.accept(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:291)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$208/999379751.runDefaultAction(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$625/601779266.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

Environment :

Flink version : 1.13.1

Flink CDC version: 1.4.0

Database and version: Mysql 7.0

Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-09 Thread Puneet Duggal
Hi,

Please find attached logfile regarding job not getting restarted on another
task manager once existing task manager got restarted.

Just FYI - We are using Fixed Delay Restart (5 times, 10s delay)

On Thu, Sep 9, 2021 at 4:29 PM Robert Metzger  wrote:

> Hi Puneet,
>
> Can you provide us with the JobManager logs of this incident? Jobs should
> not disappear, they should restart on other Task Managers.
>
> On Wed, Sep 8, 2021 at 3:06 PM Puneet Duggal 
> wrote:
>
>> Hi,
>>
>> So for past 2-3 days i have been looking for documentation which
>> elaborates how flink takes care of restarting the data streaming job. I
>> know all the restart and failover strategies but wanted to know how
>> different components (Job Manager, Task Manager etc) play a role while
>> restarting the flink data streaming job.
>>
>> I am asking this because recently in production.. when i restarted a task
>> manger, all the jobs running on it, instead of getting restarted,
>> disappeared. Within flink UI, couldn't tack those jobs in completed jobs as
>> well. Logs also couldnt provide me with good enough information.
>>
>> Also if anyone can tell me what is the role of /tmp/executionGraphStore
>> folder in Job Manager machine.
>>
>> Thanks
>>
>>
>>


logfile
Description: Binary data


Re: Job manager crash

2021-09-09 Thread mejri houssem
thanks for the response,

with respect to the api-server i don't think i can do so much about it
because i am just using a specific namespace in kubernetes cluster, it's
not me who administrate the cluster.

otherwise i will try the gc log option to see if can find something useful
in order to debug this problem.



Le jeu. 9 sept. 2021 à 22:25, houssem  a écrit :

> Hello ,
>
> with respect to the api-server i dotn re
>
> On 2021/09/09 11:37:49, Yang Wang  wrote:
> > I think @Robert Metzger  is right. You need to
> check
> > whether your Kubernetes APIServer is working properly or not(e.g.
> > overloaded).
> >
> > Another hint is about the fullGC. Please use the following config option
> to
> > enable the GC logs and check the full gc time.
> > env.java.opts.jobmanager: -verbose:gc -XX:+PrintGCDetails
> > -XX:+PrintGCDateStamps -Xloggc:/opt/flink/log/jobmanager-gc.log
> >
> > Simply increasing the renew-deadline might help. But it could not solve
> the
> > problem completely.
> > high-availability.kubernetes.leader-election.lease-duration: 120 s
> > high-availability.kubernetes.leader-election.renew-deadline: 120 s
> >
> >
> > Best,
> > Yang
> >
> > Robert Metzger  于2021年9月9日周四 下午6:52写道:
> >
> > > Is the kubernetes server you are using particularly busy? Maybe these
> > > issues occur because the server is overloaded?
> > >
> > > "Triggering checkpoint 2193 (type=CHECKPOINT) @ 1630681482667 for job
> > > ."
> > > "Completed checkpoint 2193 for job 
> (474
> > > bytes in 195 ms)."
> > > "Triggering checkpoint 2194 (type=CHECKPOINT) @ 1630681492667 for job
> > > ."
> > > "Completed checkpoint 2194 for job 
> (474
> > > bytes in 161 ms)."
> > > "Renew deadline reached after 60 seconds while renewing lock
> > > ConfigMapLock: myNs - myJob-dispatcher-leader
> > > (1bcda6b0-8a5a-4969-b9e4-2257c4478572)"
> > > "Stopping SessionDispatcherLeaderProcess."
> > >
> > > At some point, the leader election mechanism in fabric8 seems to give
> up.
> > >
> > >
> > > On Tue, Sep 7, 2021 at 10:05 AM mejri houssem <
> mejrihousse...@gmail.com>
> > > wrote:
> > >
> > >> hello,
> > >>
> > >> Here's other logs of the latest jm crash.
> > >>
> > >>
> > >> Le lun. 6 sept. 2021 à 14:18, houssem  a
> > >> écrit :
> > >>
> > >>> hello,
> > >>>
> > >>> I have three jobs running on my kubernetes cluster and each job has
> his
> > >>> own cluster id.
> > >>>
> > >>> On 2021/09/06 03:28:10, Yangze Guo  wrote:
> > >>> > Hi,
> > >>> >
> > >>> > The root cause is not "java.lang.NoClassDefFound". The job has been
> > >>> > running but could not edit the config map
> > >>> > "myJob--jobmanager-leader" and it
> > >>> > seems finally disconnected with the API server. Is there another
> job
> > >>> > with the same cluster id (myJob) ?
> > >>> >
> > >>> > I would also pull Yang Wang.
> > >>> >
> > >>> > Best,
> > >>> > Yangze Guo
> > >>> >
> > >>> > On Mon, Sep 6, 2021 at 10:10 AM Caizhi Weng 
> > >>> wrote:
> > >>> > >
> > >>> > > Hi!
> > >>> > >
> > >>> > > There is a message saying "java.lang.NoClassDefFound Error:
> > >>> org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you
> > >>> visiting HDFS in your job? If yes it seems that your Flink
> distribution or
> > >>> your cluster is lacking hadoop classes. Please make sure that there
> are
> > >>> hadoop jars in the lib directory of Flink, or your cluster has set
> the
> > >>> HADOOP_CLASSPATH environment variable.
> > >>> > >
> > >>> > > mejri houssem  于2021年9月4日周六 上午12:15写道:
> > >>> > >>
> > >>> > >>
> > >>> > >> Hello ,
> > >>> > >>
> > >>> > >> I am facing a JM crash lately. I am deploying a flink
> application
> > >>> cluster on kubernetes.
> > >>> > >>
> > >>> > >> When i install my chart using helm everything works fine but
> after
> > >>> some time ,the Jm starts to crash
> > >>> > >>
> > >>> > >> and then it gets deleted eventually after 5 restarts.
> > >>> > >>
> > >>> > >> flink version: 1.12.5 (upgraded recently from 1.12.2)
> > >>> > >> HA mode : k8s
> > >>> > >>
> > >>> > >> Here's the full log of the JM attached file.
> > >>> >
> > >>>
> > >>
> >
>


Re: Job manager crash

2021-09-09 Thread houssem
Hello ,

with respect to the api-server i dotn re

On 2021/09/09 11:37:49, Yang Wang  wrote: 
> I think @Robert Metzger  is right. You need to check
> whether your Kubernetes APIServer is working properly or not(e.g.
> overloaded).
> 
> Another hint is about the fullGC. Please use the following config option to
> enable the GC logs and check the full gc time.
> env.java.opts.jobmanager: -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -Xloggc:/opt/flink/log/jobmanager-gc.log
> 
> Simply increasing the renew-deadline might help. But it could not solve the
> problem completely.
> high-availability.kubernetes.leader-election.lease-duration: 120 s
> high-availability.kubernetes.leader-election.renew-deadline: 120 s
> 
> 
> Best,
> Yang
> 
> Robert Metzger  于2021年9月9日周四 下午6:52写道:
> 
> > Is the kubernetes server you are using particularly busy? Maybe these
> > issues occur because the server is overloaded?
> >
> > "Triggering checkpoint 2193 (type=CHECKPOINT) @ 1630681482667 for job
> > ."
> > "Completed checkpoint 2193 for job  (474
> > bytes in 195 ms)."
> > "Triggering checkpoint 2194 (type=CHECKPOINT) @ 1630681492667 for job
> > ."
> > "Completed checkpoint 2194 for job  (474
> > bytes in 161 ms)."
> > "Renew deadline reached after 60 seconds while renewing lock
> > ConfigMapLock: myNs - myJob-dispatcher-leader
> > (1bcda6b0-8a5a-4969-b9e4-2257c4478572)"
> > "Stopping SessionDispatcherLeaderProcess."
> >
> > At some point, the leader election mechanism in fabric8 seems to give up.
> >
> >
> > On Tue, Sep 7, 2021 at 10:05 AM mejri houssem 
> > wrote:
> >
> >> hello,
> >>
> >> Here's other logs of the latest jm crash.
> >>
> >>
> >> Le lun. 6 sept. 2021 à 14:18, houssem  a
> >> écrit :
> >>
> >>> hello,
> >>>
> >>> I have three jobs running on my kubernetes cluster and each job has his
> >>> own cluster id.
> >>>
> >>> On 2021/09/06 03:28:10, Yangze Guo  wrote:
> >>> > Hi,
> >>> >
> >>> > The root cause is not "java.lang.NoClassDefFound". The job has been
> >>> > running but could not edit the config map
> >>> > "myJob--jobmanager-leader" and it
> >>> > seems finally disconnected with the API server. Is there another job
> >>> > with the same cluster id (myJob) ?
> >>> >
> >>> > I would also pull Yang Wang.
> >>> >
> >>> > Best,
> >>> > Yangze Guo
> >>> >
> >>> > On Mon, Sep 6, 2021 at 10:10 AM Caizhi Weng 
> >>> wrote:
> >>> > >
> >>> > > Hi!
> >>> > >
> >>> > > There is a message saying "java.lang.NoClassDefFound Error:
> >>> org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you
> >>> visiting HDFS in your job? If yes it seems that your Flink distribution or
> >>> your cluster is lacking hadoop classes. Please make sure that there are
> >>> hadoop jars in the lib directory of Flink, or your cluster has set the
> >>> HADOOP_CLASSPATH environment variable.
> >>> > >
> >>> > > mejri houssem  于2021年9月4日周六 上午12:15写道:
> >>> > >>
> >>> > >>
> >>> > >> Hello ,
> >>> > >>
> >>> > >> I am facing a JM crash lately. I am deploying a flink application
> >>> cluster on kubernetes.
> >>> > >>
> >>> > >> When i install my chart using helm everything works fine but after
> >>> some time ,the Jm starts to crash
> >>> > >>
> >>> > >> and then it gets deleted eventually after 5 restarts.
> >>> > >>
> >>> > >> flink version: 1.12.5 (upgraded recently from 1.12.2)
> >>> > >> HA mode : k8s
> >>> > >>
> >>> > >> Here's the full log of the JM attached file.
> >>> >
> >>>
> >>
> 


Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Does the jar file you are trying to submit contain
the org/apache/kafka/common/serialization/ByteArrayDeserializer class?

On Thu, Sep 9, 2021 at 2:10 PM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Here's the complete stack trace:
>
> Server Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
> Could not execute application. at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ... 7 more Caused by: java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at

Re: Job crashing with RowSerializer EOF exception

2021-09-09 Thread Robert Metzger
Hi Yuval,

EOF exceptions during serialization are usually an indication that some
serializers in the serializer chain is somehow broken.
What data type are you serializating? Does it include some type serializer
by a custom serializer, or Kryo, ... ?

On Thu, Sep 9, 2021 at 4:35 PM Yuval Itzchakov  wrote:

> Hi,
>
> Flink 1.13.2
> Scala 2.12.7
>
> Running an app in production, I'm running into the following exception
> that frequently fails the job:
>
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Can't get next record for channel InputChannelInfo{gateIdx=0,
> inputChannelIdx=2}\n\tat
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> java.io.EOFException\n\tat
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
> org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
> 11
>
> Deserialization logic for the rows seems to be failing with an EOF
> exception. Any help on the best way to debug this or try to get more info
> would be great.
>
> Thanks.
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Seth Wiesman
Hi David,

I was also able to reproduce the behavior, but was able to get
significant performance improvements by reducing the number of slots on
each TM to 1.

My suspicion, as Piotr alluded to, has to do with the different runtime
execution of DataSet over DataStream. In particular, Flink's DataStream
operators are aware of the resource requirements of the state backend and
include RocksDB in its internal memory configurations. In the state
processor api, the underlying input format is a blackbox.

Another thing to know is that when running multiple RocksDB instances
within the same JVM, you are actually running a single native process with
multiple logical instances. I _think_ we are seeing contention amongst the
logical RocksDB instances.

Even with one slot, it is not as fast as I would like and will need to
continue investigating. If my suspicion for the slowness is correct, we
will need to migrate to the new Source API and improve this as part of
DataStream integration. This migration is something we'd like to do
regardless, but I don't have a timeline to share.

*Aside: Why is writing still relatively fast? *

Even with these factors accounted for, I do still expect writing to be
faster than reading. This is due to both how RocksDB internal data
structures work, along with some peculiarities of how to state processor
API has to perform reads.

1. RocksDB internally uses a data structure called a log structured merge
tree (or LSM). This means writes are always implemented as appends, so
there is no seek required. Additionally, writes go into an in-memory data
structure called a MemTable that is flushed to disk asynchronously.
Because there may be multiple entries for a given key, RocksDB needs to
search for the most recent value and potentially read from disk. This may
be alleviated by enabling bloom filters but does have memory costs.

2. RocksDB is a key value store, so Flink represents each registered state
(ValueState, ListState, etc) as its own column family (table). A key only
exists in a table if it has a non-null value. This means not all keys exist
in all column families for a given operator. The state-proc-api wants to
make it appear as if each operator is composed of a single table with
multiple columns. To do this, we perform a full table scan on one column
family and then do point lookups of that key on the others. However, we
still need to find the keys that may only exist in other tables. The trick
we perform is to delete keys from rocksDB after each read, so we can do
full table scans on all column families but never see any duplicates. This
means the reader is performing multiple reads and writes on every call to
`readKey` and is more expensive than it may appear.

Seth


On Thu, Sep 9, 2021 at 1:48 AM Piotr Nowojski  wrote:

> Hi David,
>
> I can confirm that I'm able to reproduce this behaviour. I've tried
> profiling/flame graphs and I was not able to make much sense out of those
> results. There are no IO/Memory bottlenecks that I could notice, it looks
> indeed like the Job is stuck inside RocksDB itself. This might be an issue
> with for example memory configuration. Streaming jobs and State Processor
> API are running in very different environments as the latter one is using
> DataSet API under the hood, so maybe that can explain this? However I'm no
> expert in neither DataSet API nor the RocksDB, so it's hard for me to make
> progress here.
>
> Maybe someone else can help here?
>
> Piotrek
>
>
> śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):
>
>> Hi,
>>
>> I'm investigating why a job we use to inspect a flink state is a lot
>> slower than the bootstrap job used to generate it.
>>
>> I use RocksdbDB with a simple keyed value state mapping a string key to a
>> long value. Generating the bootstrap state from a CSV file with 100M
>> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
>> allowed). But another job that does the opposite (converts this state into
>> a CSV file) takes several hours. I would have expected these two job
>> runtimes to be in the same ballpark.
>>
>> I wrote a simple test case[1] to reproduce the problem. This program has
>> 3 jobs:
>> - CreateState: generate a keyed state (string->long) using the state
>> processor api
>> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
>> - ReadState: reads all the keys using the state processor api
>>
>> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
>> StreamJob are done in less than a minute.
>> ReadState is much slower (> 30minutes) on my system. The RocksDB state
>> appears to be restored relatively quickly but after that the slots are
>> performing at very different speeds. Some slots finish quickly but some
>> others struggle to advance.
>> Looking at the thread dumps I always see threads in
>> org.rocksdb.RocksDB.get:
>>
>> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
>> (org.apache.flink.state.api.input.KeyedStateInputFormat)) 

Re:Re:Re: Re: Temporal Joins 报 Currently the join key in Temporal Table Join can not be empty

2021-09-09 Thread Wayne
Hello 


打扰了我最近再次尝试,还是报这个错误
我的flink版本为 flink-1.12.2-bin-scala_2.12
使用sql client执行
我的sql 如下  


CREATE TABLE stub_trans (
`uuid` STRING,
`columnInfos` MAP NOT 
NULL>  NOT NULL,
procTime TIMESTAMP(3) METADATA FROM 'timestamp' , 
WATERMARK FOR procTime AS procTime   
) WITH (
'connector' = 'kafka',
..
'format' = 'avro'
);


CREATE TABLE kapeng_test (


`event_id` STRING,
`genre_id` STRING,
`user_guid` STRING,
procTime TIMESTAMP(3) METADATA FROM 'timestamp'  ,  
WATERMARK FOR procTime AS procTime   
) WITH (
'connector' = 'kafka',

'format' = 'avro'
);




CREATE TABLE purchase_hist (
rowkey STRING,
d ROW < cost STRING, crdate STRING, currency STRING, eventid STRING, 
genreid STRING, quantity STRING >,
PRIMARY KEY ( rowkey ) NOT ENFORCED 
) WITH ( 
'connector' = 'hbase-1.4', 
.);




INSERT INTO purchase_hist SELECT
rowkey,
ROW ( cost, crdate, currency, eventid, genreid, quantity ) 
FROM
( SELECT
CONCAT_WS(
'_',
user_guid,
CAST( CAST( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'-MM-dd:HH:mm:ss.S' ) AS BIGINT ) AS STRING )) AS rowkey,
columnInfos [ 'TICKET_COST' ].newValue AS cost,
DATE_FORMAT( TO_TIMESTAMP ( columnInfos [ 'CREATED_TS' ].newValue, 
'-MM-dd:HH:mm:ss.S' ), '-MM-dd' ) AS crdate,
columnInfos [ 'EVENT_ID' ].newValue AS eventid,
columnInfos [ 'CURRENCY_CODE' ].newValue AS currency,
columnInfos [ 'QUANTITY' ].newValue AS quantity,
genre_id AS genreid,
user_guid AS userGuid 
FROM
stub_trans
   LEFT  JOIN kapeng_test FOR SYSTEM_TIME AS OF stub_trans.procTime ON 
stub_trans.columnInfos [ 'EVENT_ID' ].newValue = kapeng_test.event_id )m




报错如下
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Currently the join key in Temporal Table Join can not be empty.
at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at 

DataStreamAPI and Stateful functions

2021-09-09 Thread Barry Higgins
Hi, 
I'm looing at using the DataStream API from a Flink application against a 
remote python stateful function deployed on another machine. I would like to 
investigate how feasible it is to have all of the state management being 
handled from the calling side meaning that we don't need another installation 
of Flink to manage the stateful functions.

Unfortunately the example referenced in the documentation: 
https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/

is no longer in existence:
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java

There is an older version that is available here:
https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example

and I have tried to work with this without much success

The calling element of the code looks as follows:

   StatefulFunctionEgressStreams out =
StatefulFunctionDataStreamBuilder.builder("example")
.withDataStreamAsIngress(names)
.withFunctionProvider(GREET, unused -> new MyFunction())
.withRequestReplyRemoteFunction(
requestReplyFunctionBuilder(
REMOTE_GREET, 
URI.create("http://localhost:5000/statefun;))
.withPersistedState("seen_count")
.withMaxRequestDuration(Duration.ofSeconds(15))
.withMaxNumBatchRequests(500))
.withEgressId(GREETINGS)
.withConfiguration(statefunConfig)
.build(env); 

with a reference to a FunctionProvider that exists as an inner class in the 
same class. We would like this to be a remote call, where I guess I would 
replace http://localhost:5000/statefun with the remote address of the SF.
However when I do make such a change the code is still referring to the inner 
function and any changes to the local MyFunction class are returned regardless 
of what is deployed remotely.

If anyone has a working example of how to interact via DataStreams with a 
remotely deployed SF, I would be very grateful. I would be very happy to update 
the documentation if I can get this working.
Cheers,
Barry



Job crashing with RowSerializer EOF exception

2021-09-09 Thread Yuval Itzchakov
Hi,

Flink 1.13.2
Scala 2.12.7

Running an app in production, I'm running into the following exception that
frequently fails the job:

switched from RUNNING to FAILED with failure cause: java.io.IOException:
Can't get next record for channel InputChannelInfo{gateIdx=0,
inputChannelIdx=2}\n\tat
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
java.io.EOFException\n\tat
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
11

Deserialization logic for the rows seems to be failing with an EOF
exception. Any help on the best way to debug this or try to get more info
would be great.

Thanks.
-- 
Best Regards,
Yuval Itzchakov.


Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Chesnay Schepler

I'm afraid there's no real workaround.

If the information for completed jobs isn't important to you then 
setting jobstore.expiration-time to a low value can reduce the impact, 
or setting jobstore.max-capacity to 0 would prevent any completed job 
from being displayed.


Beyond that I can't think of anything.

We will track the issue in this ticket: 
https://issues.apache.org/jira/browse/FLINK-20195


I will also file a related ticket that the job archiving fails because, 
again, we archive multiple jobs with the same job ID.



On 09/09/2021 15:15, Peter Westermann wrote:


Thanks Chesnay. You are understanding this correctly. Your explanation 
makes sense to me.


Is there anything we can do to prevent this? At least for us, most 
times a leader election happens, the leader doesn’t actually change 
because the jobmanager is still healthy.


Thanks,

Peter


*From: *Chesnay Schepler 
*Date: *Thursday, September 9, 2021 at 9:11 AM
*To: *Peter Westermann , Piotr Nowojski 
, user@flink.apache.org 

*Subject: *Re: Duplicate copies of job in Flink UI/API

Just to double-check that I'm understanding things correctly:

You have a job with HA, then Zookeeper breaks down, the job gets 
suspended, ZK comes back online, and the _same_ JobManager becomes the 
leader?


If so, then I can explain why this happens and hopefully reproduce it.

In short, when a job is suspended (or terminates in any other way) 
then information about the job is stored in a data-structure.


This is used by the REST API (and thus, UI) to query completed jobs.

For _running_ jobs we query the JobMaster (a component within the 
JobManager responsible for that job) instead.


When listing all jobs we query all jobs from the data-structure for 
finished jobs, _and_ all JobMasters for running jobs. The core 
assumption here is that for a given ID only one of these can return 
something.


So what ends up happening is that when the job is suspended it is 
written to the data-structure, and then another JobMaster is started 
for the same job, and when listing all jobs we can now end up asking 
for the same job from multiple sources.


This is a somewhat unusual scenario because usually when a job is 
suspended another JobManager becomes the leader (where this wouldn't 
occur because the data-structure isn't shared across JobManagers).


On 09/09/2021 13:37, Peter Westermann wrote:

Hi Piotr,

Jobmanager logs are attached to this email. The only thing that
jumps out to me is this:

09/08/2021 09:02:26.240 -0400 ERROR
org.apache.flink.runtime.history.FsJobArchivist Failed to archive job.

  java.io.IOException: File already
exists:s3p://flink-s3-bucket/history/2db4ee6397151a1109d1ca05188a4cbb

This happened days after the Flink update – and not just once.
Across all our Flink clusters I’ve seen this 3 times. The cause
for the jobmanager leadership loss in this case was a deployment
of our zookeeper cluster that lead to a brief connection loss. The
new leader election is expected.

Thanks,

Peter

*From: *Piotr Nowojski 

*Date: *Thursday, September 9, 2021 at 12:39 AM
*To: *Peter Westermann 

*Cc: *user@flink.apache.org 
 
*Subject: *Re: Duplicate copies of job in Flink UI/API

Hi Peter,

Can you provide relevant JobManager logs? And can you write down
what steps have you taken before the failure happened? Did this
failure occur during upgrading Flink, or after the upgrade etc.

Best,

Piotrek

śr., 8 wrz 2021 o 16:11 Peter Westermann
mailto:no.westerm...@genesys.com>>
napisał(a):

We recently upgraded from Flink 1.12.4 to 1.12.5 and are
seeing some weird behavior after a change in jobmanager
leadership: We’re seeing two copies of the same job, one of
those is in SUSPENDED state and has a start time of zero.
Here’s the output from the /jobs/overview endpoint:

{

  "jobs": [{

    "jid": "2db4ee6397151a1109d1ca05188a4cbb",

    "name": "analytics-flink-v1",

    "state": "RUNNING",

    "start-time": 1631106146284,

    "end-time": -1,

    "duration": 2954642,

    "last-modification": 1631106152322,

    "tasks": {

  "total": 112,

  "created": 0,

  "scheduled": 0,

  "deploying": 0,

  "running": 112,

  "finished": 0,

  "canceling": 0,

  "canceled": 0,

  "failed": 0,

  "reconciling": 0

    }

  }, {

    "jid": "2db4ee6397151a1109d1ca05188a4cbb",

    "name": "analytics-flink-v1",

    "state": "SUSPENDED",

    "start-time": 0,

    "end-time": -1,

    "duration": 1631105900760,

    

Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Peter Westermann
Thanks Chesnay. You are understanding this correctly. Your explanation makes 
sense to me.
Is there anything we can do to prevent this? At least for us, most times a 
leader election happens, the leader doesn’t actually change because the 
jobmanager is still healthy.

Thanks,
Peter




From: Chesnay Schepler 
Date: Thursday, September 9, 2021 at 9:11 AM
To: Peter Westermann , Piotr Nowojski 
, user@flink.apache.org 
Subject: Re: Duplicate copies of job in Flink UI/API
Just to double-check that I'm understanding things correctly:

You have a job with HA, then Zookeeper breaks down, the job gets suspended, ZK 
comes back online, and the _same_ JobManager becomes the leader?

If so, then I can explain why this happens and hopefully reproduce it.

In short, when a job is suspended (or terminates in any other way) then 
information about the job is stored in a data-structure.
This is used by the REST API (and thus, UI) to query completed jobs.
For _running_ jobs we query the JobMaster (a component within the JobManager 
responsible for that job) instead.

When listing all jobs we query all jobs from the data-structure for finished 
jobs, _and_ all JobMasters for running jobs. The core assumption here is that 
for a given ID only one of these can return something.

So what ends up happening is that when the job is suspended it is written to 
the data-structure, and then another JobMaster is started for the same job, and 
when listing all jobs we can now end up asking for the same job from multiple 
sources.

This is a somewhat unusual scenario because usually when a job is suspended 
another JobManager becomes the leader (where this wouldn't occur because the 
data-structure isn't shared across JobManagers).



On 09/09/2021 13:37, Peter Westermann wrote:
Hi Piotr,

Jobmanager logs are attached to this email. The only thing that jumps out to me 
is this:

09/08/2021 09:02:26.240 -0400 ERROR 
org.apache.flink.runtime.history.FsJobArchivist Failed to archive job.
  java.io.IOException: File already 
exists:s3p://flink-s3-bucket/history/2db4ee6397151a1109d1ca05188a4cbb

This happened days after the Flink update – and not just once. Across all our 
Flink clusters I’ve seen this 3 times. The cause for the jobmanager leadership 
loss in this case was a deployment of our zookeeper cluster that lead to a 
brief connection loss. The new leader election is expected.

Thanks,
Peter


From: Piotr Nowojski 
Date: Thursday, September 9, 2021 at 12:39 AM
To: Peter Westermann 

Cc: user@flink.apache.org 

Subject: Re: Duplicate copies of job in Flink UI/API
Hi Peter,

Can you provide relevant JobManager logs? And can you write down what steps 
have you taken before the failure happened? Did this failure occur during 
upgrading Flink, or after the upgrade etc.

Best,
Piotrek

śr., 8 wrz 2021 o 16:11 Peter Westermann 
mailto:no.westerm...@genesys.com>> napisał(a):
We recently upgraded from Flink 1.12.4 to 1.12.5 and are seeing some weird 
behavior after a change in jobmanager leadership: We’re seeing two copies of 
the same job, one of those is in SUSPENDED state and has a start time of zero. 
Here’s the output from the /jobs/overview endpoint:
{
  "jobs": [{
"jid": "2db4ee6397151a1109d1ca05188a4cbb",
"name": "analytics-flink-v1",
"state": "RUNNING",
"start-time": 1631106146284,
"end-time": -1,
"duration": 2954642,
"last-modification": 1631106152322,
"tasks": {
  "total": 112,
  "created": 0,
  "scheduled": 0,
  "deploying": 0,
  "running": 112,
  "finished": 0,
  "canceling": 0,
  "canceled": 0,
  "failed": 0,
  "reconciling": 0
}
  }, {
"jid": "2db4ee6397151a1109d1ca05188a4cbb",
"name": "analytics-flink-v1",
"state": "SUSPENDED",
"start-time": 0,
"end-time": -1,
"duration": 1631105900760,
"last-modification": 0,
"tasks": {
  "total": 0,
  "created": 0,
  "scheduled": 0,
  "deploying": 0,
  "running": 0,
  "finished": 0,
  "canceling": 0,
  "canceled": 0,
  "failed": 0,
  "reconciling": 0
}
  }]
}

Has anyone seen this behavior before?

Thanks,
Peter




Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Chesnay Schepler

Just to double-check that I'm understanding things correctly:

You have a job with HA, then Zookeeper breaks down, the job gets 
suspended, ZK comes back online, and the _same_ JobManager becomes the 
leader?


If so, then I can explain why this happens and hopefully reproduce it.

In short, when a job is suspended (or terminates in any other way) then 
information about the job is stored in a data-structure.

This is used by the REST API (and thus, UI) to query completed jobs.
For _running_ jobs we query the JobMaster (a component within the 
JobManager responsible for that job) instead.


When listing all jobs we query all jobs from the data-structure for 
finished jobs, _and_ all JobMasters for running jobs. The core 
assumption here is that for a given ID only one of these can return 
something.


So what ends up happening is that when the job is suspended it is 
written to the data-structure, and then another JobMaster is started for 
the same job, and when listing all jobs we can now end up asking for the 
same job from multiple sources.


This is a somewhat unusual scenario because usually when a job is 
suspended another JobManager becomes the leader (where this wouldn't 
occur because the data-structure isn't shared across JobManagers).




On 09/09/2021 13:37, Peter Westermann wrote:


Hi Piotr,

Jobmanager logs are attached to this email. The only thing that jumps 
out to me is this:


09/08/2021 09:02:26.240 -0400 ERROR 
org.apache.flink.runtime.history.FsJobArchivist Failed to archive job.


  java.io.IOException: File already 
exists:s3p://flink-s3-bucket/history/2db4ee6397151a1109d1ca05188a4cbb


This happened days after the Flink update – and not just once. Across 
all our Flink clusters I’ve seen this 3 times. The cause for the 
jobmanager leadership loss in this case was a deployment of our 
zookeeper cluster that lead to a brief connection loss. The new leader 
election is expected.


Thanks,

Peter

*From: *Piotr Nowojski 
*Date: *Thursday, September 9, 2021 at 12:39 AM
*To: *Peter Westermann 
*Cc: *user@flink.apache.org 
*Subject: *Re: Duplicate copies of job in Flink UI/API

Hi Peter,

Can you provide relevant JobManager logs? And can you write down what 
steps have you taken before the failure happened? Did this 
failure occur during upgrading Flink, or after the upgrade etc.


Best,

Piotrek

śr., 8 wrz 2021 o 16:11 Peter Westermann > napisał(a):


We recently upgraded from Flink 1.12.4 to 1.12.5 and are seeing
some weird behavior after a change in jobmanager leadership: We’re
seeing two copies of the same job, one of those is in SUSPENDED
state and has a start time of zero. Here’s the output from the
/jobs/overview endpoint:

{

  "jobs": [{

    "jid": "2db4ee6397151a1109d1ca05188a4cbb",

    "name": "analytics-flink-v1",

    "state": "RUNNING",

    "start-time": 1631106146284,

    "end-time": -1,

    "duration": 2954642,

    "last-modification": 1631106152322,

    "tasks": {

  "total": 112,

  "created": 0,

  "scheduled": 0,

  "deploying": 0,

  "running": 112,

  "finished": 0,

  "canceling": 0,

  "canceled": 0,

  "failed": 0,

  "reconciling": 0

    }

  }, {

    "jid": "2db4ee6397151a1109d1ca05188a4cbb",

    "name": "analytics-flink-v1",

    "state": "SUSPENDED",

    "start-time": 0,

    "end-time": -1,

    "duration": 1631105900760,

    "last-modification": 0,

    "tasks": {

  "total": 0,

  "created": 0,

  "scheduled": 0,

  "deploying": 0,

  "running": 0,

  "finished": 0,

  "canceling": 0,

  "canceled": 0,

  "failed": 0,

  "reconciling": 0

    }

  }]

}

Has anyone seen this behavior before?

Thanks,

Peter





Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Harshvardhan Shinde
Here's the complete stack trace:

Server Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
Could not execute application. at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
... 7 more Caused by: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at

Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Can you share the full stack trace, not just a part of it?

On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Hi,
>
> I added the dependencies while trying to resolve the same issue, thought I
> was missing them.
>
> Thanks
>
> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger  wrote:
>
>> Hey,
>>
>> Why do you have these dependencies in your pom?
>>
>> 
>> 
>> org.apache.kafka
>> kafka-clients
>> 2.8.0
>> 
>>
>> 
>> org.apache.kafka
>> kafka_2.12
>> 2.8.0
>> 
>>
>>
>> They are not needed for using the Kafka connector of Flink (the flink
>> kafka connector dependencies pulls the required dependencies)
>>
>>
>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>> harshvardhan.shi...@oyorooms.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying a simple flink job that reads data from a kafka topic and
>>> creates a Hive table.
>>>
>>> I'm following the steps from here
>>> 
>>> .
>>>
>>> Here's my code:
>>>
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableEnvironment;
>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>
>>> EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>
>>> String name= "myhive";
>>> String defaultDatabase = "harsh_test";
>>> String hiveConfDir = "/etc/hive/conf";
>>>
>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>> tableEnv.registerCatalog(name, hive);
>>>
>>> // set the HiveCatalog as the current catalog of the session
>>> tableEnv.useCatalog(name);
>>>
>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>   "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>   "  `partition` BIGINT METADATA VIRTUAL,\n" +
>>>   "  `offset` BIGINT METADATA VIRTUAL,\n" +
>>>   "account_id  BIGINT,\n" +
>>>   "amount  BIGINT,\n" +
>>>   "transaction_time TIMESTAMP(3),\n" +
>>>   "WATERMARK FOR transaction_time AS transaction_time - INTERVAL 
>>> '5' SECOND\n" +
>>>   ") WITH (\n" +
>>>   "'connector' = 'kafka',\n" +
>>>   "'topic' = 'flink-stream-table',\n" +
>>>   "'properties.bootstrap.servers' = ':9092',\n" +
>>>   "   'scan.startup.mode' = 'earliest-offset',\n" +
>>>   "'format'= 'csv'\n" +
>>>   ")");
>>>
>>> Table table = tableEnv.sqlQuery("Select * from transactions");
>>> table.execute().print();
>>>
>>> The code builds successfully, but I'm getting the following runtime
>>> error:
>>>
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>>> ..
>>>
>>> Here are my pom.xml file contents:
>>>
>>> 
>>> http://maven.apache.org/POM/4.0.0; 
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>> 4.0.0
>>>
>>> com.harsh.test
>>> harsh-flink-test
>>> 1.0-SNAPSHOT
>>> jar
>>>
>>> Flink Quickstart Job
>>> http://www.myorganization.org
>>>
>>> 
>>> UTF-8
>>> 1.13.2
>>> 1.8
>>> 2.3.6
>>> 2.12
>>> ${java.version}
>>> ${java.version}
>>> 
>>>
>>> 
>>> 
>>> apache.snapshots
>>> Apache Development Snapshot Repository
>>> 
>>> https://repository.apache.org/content/repositories/snapshots/
>>> 
>>> false
>>> 
>>> 
>>> true
>>> 
>>> 
>>> 
>>>
>>> 
>>> 
>>> 
>>> 
>>> org.apache.flink
>>> flink-java
>>> ${flink.version}
>>> 
>>> 
>>> org.apache.flink
>>> 
>>> flink-streaming-java_${scala.binary.version}
>>> ${flink.version}
>>> 
>>>
>>> 
>>>
>>> 
>>>
>>> 
>>> 
>>> org.apache.flink
>>> 
>>> flink-connector-kafka_${scala.binary.version}
>>> ${flink.version}
>>> 
>>>
>>> 
>>> org.apache.flink
>>> 
>>> flink-table-api-java-bridge_${scala.binary.version}
>>> 

Re: Allocation-preserving scheduling and task-local recovery

2021-09-09 Thread Robert Metzger
Hi,
from my understanding of the code [1], the task scheduling first considers
the state location, and then uses the evenly spread out scheduling strategy
as a fall back. So in my understanding of the code, the local recovery
should have preference over the evenly spread out strategy.

If you can easily test it, I would still recommend removing the
"cluster.evenly-spread-out-slots" strategy, just to make sure my
understanding is really correct.

I don't think that's the case, but just to make sure: You are only
restarting a single task manager, right? The other task managers keep
running? (Afaik the state information is lost of a TaskManager restarts)

Sorry that I don't have a real answer here (yet). Is there anything
suspicious in the JobManager or TaskManager logs?


[1]
https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java

On Wed, Sep 8, 2021 at 9:44 PM Xiang Zhang  wrote:

> We also have this configuration set in case it makes any difference when
> allocation tasks: cluster.evenly-spread-out-slots.
>
> On 2021/09/08 18:09:52, Xiang Zhang  wrote:
> > Hello,
> >
> > We have an app running on Flink 1.10.2 deployed in standalone mode. We
> > enabled task-local recovery by setting both
> *state.backend.local-recovery *and
> > *state.backend.rocksdb.localdir*. The app has over 100 task managers and
> 2
> > job managers (active and passive).
> >
> > This is what we have observed. When we restarted a task manager, all
> tasks
> > got canceled (due to the default failover configuration
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#failover-strategies
> >).
> > Then these tasks were re-distributed among the task manager (e.g. some
> > tasks manager have more slots used than before restart). This caused all
> > task managers to download state from remote storage all over again.
> >
> > The same thing happened when we restarted a job manager. The job manager
> > failed over to the passive one successfully, however all tasks were
> > canceled and reallocated among the task managers again.
> >
> > My understanding is that if task-local recovery is enabled, Flink will
> try
> > to enable sticky assignment of tasks to previous task managers they run
> on.
> > This doesn't seem to be the case. My question is how we can enable
> > this allocation-preserving
> > scheduling
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#allocation-preserving-scheduling
> >
> > when Flink handles failures.
> >
> > Thanks,
> > Xiang
> >
>


Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Harshvardhan Shinde
Hi,

I added the dependencies while trying to resolve the same issue, thought I
was missing them.

Thanks

On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger  wrote:

> Hey,
>
> Why do you have these dependencies in your pom?
>
> 
> 
> org.apache.kafka
> kafka-clients
> 2.8.0
> 
>
> 
> org.apache.kafka
> kafka_2.12
> 2.8.0
> 
>
>
> They are not needed for using the Kafka connector of Flink (the flink
> kafka connector dependencies pulls the required dependencies)
>
>
> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
> harshvardhan.shi...@oyorooms.com> wrote:
>
>> Hi,
>>
>> I'm trying a simple flink job that reads data from a kafka topic and
>> creates a Hive table.
>>
>> I'm following the steps from here
>> 
>> .
>>
>> Here's my code:
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>
>> EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>
>> String name= "myhive";
>> String defaultDatabase = "harsh_test";
>> String hiveConfDir = "/etc/hive/conf";
>>
>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>> tableEnv.registerCatalog(name, hive);
>>
>> // set the HiveCatalog as the current catalog of the session
>> tableEnv.useCatalog(name);
>>
>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>   "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>   "  `partition` BIGINT METADATA VIRTUAL,\n" +
>>   "  `offset` BIGINT METADATA VIRTUAL,\n" +
>>   "account_id  BIGINT,\n" +
>>   "amount  BIGINT,\n" +
>>   "transaction_time TIMESTAMP(3),\n" +
>>   "WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
>> SECOND\n" +
>>   ") WITH (\n" +
>>   "'connector' = 'kafka',\n" +
>>   "'topic' = 'flink-stream-table',\n" +
>>   "'properties.bootstrap.servers' = ':9092',\n" +
>>   "   'scan.startup.mode' = 'earliest-offset',\n" +
>>   "'format'= 'csv'\n" +
>>   ")");
>>
>> Table table = tableEnv.sqlQuery("Select * from transactions");
>> table.execute().print();
>>
>> The code builds successfully, but I'm getting the following runtime error:
>>
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>> ..
>>
>> Here are my pom.xml file contents:
>>
>> 
>> http://maven.apache.org/POM/4.0.0; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>> 4.0.0
>>
>> com.harsh.test
>> harsh-flink-test
>> 1.0-SNAPSHOT
>> jar
>>
>> Flink Quickstart Job
>> http://www.myorganization.org
>>
>> 
>> UTF-8
>> 1.13.2
>> 1.8
>> 2.3.6
>> 2.12
>> ${java.version}
>> ${java.version}
>> 
>>
>> 
>> 
>> apache.snapshots
>> Apache Development Snapshot Repository
>> 
>> https://repository.apache.org/content/repositories/snapshots/
>> 
>> false
>> 
>> 
>> true
>> 
>> 
>> 
>>
>> 
>> 
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> 
>> flink-streaming-java_${scala.binary.version}
>> ${flink.version}
>> 
>>
>> 
>>
>> 
>>
>> 
>> 
>> org.apache.flink
>> 
>> flink-connector-kafka_${scala.binary.version}
>> ${flink.version}
>> 
>>
>> 
>> org.apache.flink
>> 
>> flink-table-api-java-bridge_${scala.binary.version}
>> ${flink.version}
>> 
>>
>> 
>> org.apache.flink
>> 
>> flink-table-planner-blink_${scala.binary.version}
>> ${flink.version}
>> 
>>
>> 
>> 
>> org.apache.flink
>> flink-table-planner_2.12
>> 1.13.2
>> 
>>
>>
>> 
>>  

Re: Job manager crash

2021-09-09 Thread Yang Wang
I think @Robert Metzger  is right. You need to check
whether your Kubernetes APIServer is working properly or not(e.g.
overloaded).

Another hint is about the fullGC. Please use the following config option to
enable the GC logs and check the full gc time.
env.java.opts.jobmanager: -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -Xloggc:/opt/flink/log/jobmanager-gc.log

Simply increasing the renew-deadline might help. But it could not solve the
problem completely.
high-availability.kubernetes.leader-election.lease-duration: 120 s
high-availability.kubernetes.leader-election.renew-deadline: 120 s


Best,
Yang

Robert Metzger  于2021年9月9日周四 下午6:52写道:

> Is the kubernetes server you are using particularly busy? Maybe these
> issues occur because the server is overloaded?
>
> "Triggering checkpoint 2193 (type=CHECKPOINT) @ 1630681482667 for job
> ."
> "Completed checkpoint 2193 for job  (474
> bytes in 195 ms)."
> "Triggering checkpoint 2194 (type=CHECKPOINT) @ 1630681492667 for job
> ."
> "Completed checkpoint 2194 for job  (474
> bytes in 161 ms)."
> "Renew deadline reached after 60 seconds while renewing lock
> ConfigMapLock: myNs - myJob-dispatcher-leader
> (1bcda6b0-8a5a-4969-b9e4-2257c4478572)"
> "Stopping SessionDispatcherLeaderProcess."
>
> At some point, the leader election mechanism in fabric8 seems to give up.
>
>
> On Tue, Sep 7, 2021 at 10:05 AM mejri houssem 
> wrote:
>
>> hello,
>>
>> Here's other logs of the latest jm crash.
>>
>>
>> Le lun. 6 sept. 2021 à 14:18, houssem  a
>> écrit :
>>
>>> hello,
>>>
>>> I have three jobs running on my kubernetes cluster and each job has his
>>> own cluster id.
>>>
>>> On 2021/09/06 03:28:10, Yangze Guo  wrote:
>>> > Hi,
>>> >
>>> > The root cause is not "java.lang.NoClassDefFound". The job has been
>>> > running but could not edit the config map
>>> > "myJob--jobmanager-leader" and it
>>> > seems finally disconnected with the API server. Is there another job
>>> > with the same cluster id (myJob) ?
>>> >
>>> > I would also pull Yang Wang.
>>> >
>>> > Best,
>>> > Yangze Guo
>>> >
>>> > On Mon, Sep 6, 2021 at 10:10 AM Caizhi Weng 
>>> wrote:
>>> > >
>>> > > Hi!
>>> > >
>>> > > There is a message saying "java.lang.NoClassDefFound Error:
>>> org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you
>>> visiting HDFS in your job? If yes it seems that your Flink distribution or
>>> your cluster is lacking hadoop classes. Please make sure that there are
>>> hadoop jars in the lib directory of Flink, or your cluster has set the
>>> HADOOP_CLASSPATH environment variable.
>>> > >
>>> > > mejri houssem  于2021年9月4日周六 上午12:15写道:
>>> > >>
>>> > >>
>>> > >> Hello ,
>>> > >>
>>> > >> I am facing a JM crash lately. I am deploying a flink application
>>> cluster on kubernetes.
>>> > >>
>>> > >> When i install my chart using helm everything works fine but after
>>> some time ,the Jm starts to crash
>>> > >>
>>> > >> and then it gets deleted eventually after 5 restarts.
>>> > >>
>>> > >> flink version: 1.12.5 (upgraded recently from 1.12.2)
>>> > >> HA mode : k8s
>>> > >>
>>> > >> Here's the full log of the JM attached file.
>>> >
>>>
>>


Re: Duplicate copies of job in Flink UI/API

2021-09-09 Thread Peter Westermann
Hi Piotr,

Jobmanager logs are attached to this email. The only thing that jumps out to me 
is this:

09/08/2021 09:02:26.240 -0400 ERROR 
org.apache.flink.runtime.history.FsJobArchivist Failed to archive job.
  java.io.IOException: File already 
exists:s3p://flink-s3-bucket/history/2db4ee6397151a1109d1ca05188a4cbb

This happened days after the Flink update – and not just once. Across all our 
Flink clusters I’ve seen this 3 times. The cause for the jobmanager leadership 
loss in this case was a deployment of our zookeeper cluster that lead to a 
brief connection loss. The new leader election is expected.

Thanks,
Peter


From: Piotr Nowojski 
Date: Thursday, September 9, 2021 at 12:39 AM
To: Peter Westermann 
Cc: user@flink.apache.org 
Subject: Re: Duplicate copies of job in Flink UI/API
Hi Peter,

Can you provide relevant JobManager logs? And can you write down what steps 
have you taken before the failure happened? Did this failure occur during 
upgrading Flink, or after the upgrade etc.

Best,
Piotrek

śr., 8 wrz 2021 o 16:11 Peter Westermann 
mailto:no.westerm...@genesys.com>> napisał(a):
We recently upgraded from Flink 1.12.4 to 1.12.5 and are seeing some weird 
behavior after a change in jobmanager leadership: We’re seeing two copies of 
the same job, one of those is in SUSPENDED state and has a start time of zero. 
Here’s the output from the /jobs/overview endpoint:
{
  "jobs": [{
"jid": "2db4ee6397151a1109d1ca05188a4cbb",
"name": "analytics-flink-v1",
"state": "RUNNING",
"start-time": 1631106146284,
"end-time": -1,
"duration": 2954642,
"last-modification": 1631106152322,
"tasks": {
  "total": 112,
  "created": 0,
  "scheduled": 0,
  "deploying": 0,
  "running": 112,
  "finished": 0,
  "canceling": 0,
  "canceled": 0,
  "failed": 0,
  "reconciling": 0
}
  }, {
"jid": "2db4ee6397151a1109d1ca05188a4cbb",
"name": "analytics-flink-v1",
"state": "SUSPENDED",
"start-time": 0,
"end-time": -1,
"duration": 1631105900760,
"last-modification": 0,
"tasks": {
  "total": 0,
  "created": 0,
  "scheduled": 0,
  "deploying": 0,
  "running": 0,
  "finished": 0,
  "canceling": 0,
  "canceled": 0,
  "failed": 0,
  "reconciling": 0
}
  }]
}

Has anyone seen this behavior before?

Thanks,
Peter
09/08/2021 09:02:31.015 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id fbddb90b669081bd9907c835f1906a79.
09/08/2021 09:02:31.015 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id ee62d44923180b0ac66e10ed170f0af3.
09/08/2021 09:02:31.015 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id 13d0e72b41883dfb84e866645b07dc92.
09/08/2021 09:02:31.015 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id ea4064056de27327a2037f4d71aa9e5c.
09/08/2021 09:02:31.015 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id cfcc6014e93f09884ad5f61e4a108e8d.
09/08/2021 09:02:31.014 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id 3dd4d17bf50232c95f178d6a235f2dc1.
09/08/2021 09:02:31.014 -0400 INFO 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager Request slot 
with profile ResourceProfile{UNKNOWN} for job 2db4ee6397151a1109d1ca05188a4cbb 
with allocation id 13a7257a9aedd2ce2ca5267f93586763.
09/08/2021 09:02:31.014 -0400 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl Requesting new slot 
[SlotRequestId{f9b28f7479a60f286846fd9d5f1f4e8e}] and profile 
ResourceProfile{UNKNOWN} with allocation id fbddb90b669081bd9907c835f1906a79 
from resource manager.
09/08/2021 09:02:31.014 -0400 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl Requesting new slot 
[SlotRequestId{1df0f0709f439fc647c995a36d8c60a7}] and profile 
ResourceProfile{UNKNOWN} with allocation id ee62d44923180b0ac66e10ed170f0af3 
from resource manager.
09/08/2021 09:02:31.014 -0400 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl Requesting new slot 
[SlotRequestId{2a441ce02b709ffacdcc319d715239d8}] and profile 
ResourceProfile{UNKNOWN} with allocation id 13d0e72b41883dfb84e866645b07dc92 
from resource manager.
09/08/2021 09:02:31.014 -0400 INFO 

Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-09 Thread Robert Metzger
Hi Puneet,

Can you provide us with the JobManager logs of this incident? Jobs should
not disappear, they should restart on other Task Managers.

On Wed, Sep 8, 2021 at 3:06 PM Puneet Duggal 
wrote:

> Hi,
>
> So for past 2-3 days i have been looking for documentation which
> elaborates how flink takes care of restarting the data streaming job. I
> know all the restart and failover strategies but wanted to know how
> different components (Job Manager, Task Manager etc) play a role while
> restarting the flink data streaming job.
>
> I am asking this because recently in production.. when i restarted a task
> manger, all the jobs running on it, instead of getting restarted,
> disappeared. Within flink UI, couldn't tack those jobs in completed jobs as
> well. Logs also couldnt provide me with good enough information.
>
> Also if anyone can tell me what is the role of /tmp/executionGraphStore
> folder in Job Manager machine.
>
> Thanks
>
>
>


Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Hey,

Why do you have these dependencies in your pom?



org.apache.kafka
kafka-clients
2.8.0



org.apache.kafka
kafka_2.12
2.8.0



They are not needed for using the Kafka connector of Flink (the flink kafka
connector dependencies pulls the required dependencies)


On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Hi,
>
> I'm trying a simple flink job that reads data from a kafka topic and
> creates a Hive table.
>
> I'm following the steps from here
> 
> .
>
> Here's my code:
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
>
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
> String name= "myhive";
> String defaultDatabase = "harsh_test";
> String hiveConfDir = "/etc/hive/conf";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
> tableEnv.registerCatalog(name, hive);
>
> // set the HiveCatalog as the current catalog of the session
> tableEnv.useCatalog(name);
>
> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>   "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>   "  `partition` BIGINT METADATA VIRTUAL,\n" +
>   "  `offset` BIGINT METADATA VIRTUAL,\n" +
>   "account_id  BIGINT,\n" +
>   "amount  BIGINT,\n" +
>   "transaction_time TIMESTAMP(3),\n" +
>   "WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
> SECOND\n" +
>   ") WITH (\n" +
>   "'connector' = 'kafka',\n" +
>   "'topic' = 'flink-stream-table',\n" +
>   "'properties.bootstrap.servers' = ':9092',\n" +
>   "   'scan.startup.mode' = 'earliest-offset',\n" +
>   "'format'= 'csv'\n" +
>   ")");
>
> Table table = tableEnv.sqlQuery("Select * from transactions");
> table.execute().print();
>
> The code builds successfully, but I'm getting the following runtime error:
>
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ..
>
> Here are my pom.xml file contents:
>
> 
> http://maven.apache.org/POM/4.0.0; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> com.harsh.test
> harsh-flink-test
> 1.0-SNAPSHOT
> jar
>
> Flink Quickstart Job
> http://www.myorganization.org
>
> 
> UTF-8
> 1.13.2
> 1.8
> 2.3.6
> 2.12
> ${java.version}
> ${java.version}
> 
>
> 
> 
> apache.snapshots
> Apache Development Snapshot Repository
> 
> https://repository.apache.org/content/repositories/snapshots/
> 
> false
> 
> 
> true
> 
> 
> 
>
> 
> 
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> 
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
>
> 
>
> 
>
> 
> 
> org.apache.flink
> 
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
> 
> org.apache.flink
> 
> flink-table-api-java-bridge_${scala.binary.version}
> ${flink.version}
> 
>
> 
> org.apache.flink
> 
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
>
> 
> 
> org.apache.flink
> flink-table-planner_2.12
> 1.13.2
> 
>
>
> 
> 
> org.apache.flink
> 
> flink-connector-hive_${scala.binary.version}
> ${flink.version}
> 
>
> 
> 
> org.apache.hive
> hive-exec
> ${hive.version}
> 
>
> 
> 
> javax.servlet
> javax.servlet-api
> 

Re: Job manager crash

2021-09-09 Thread Robert Metzger
Is the kubernetes server you are using particularly busy? Maybe these
issues occur because the server is overloaded?

"Triggering checkpoint 2193 (type=CHECKPOINT) @ 1630681482667 for job
."
"Completed checkpoint 2193 for job  (474
bytes in 195 ms)."
"Triggering checkpoint 2194 (type=CHECKPOINT) @ 1630681492667 for job
."
"Completed checkpoint 2194 for job  (474
bytes in 161 ms)."
"Renew deadline reached after 60 seconds while renewing lock ConfigMapLock:
myNs - myJob-dispatcher-leader (1bcda6b0-8a5a-4969-b9e4-2257c4478572)"
"Stopping SessionDispatcherLeaderProcess."

At some point, the leader election mechanism in fabric8 seems to give up.


On Tue, Sep 7, 2021 at 10:05 AM mejri houssem 
wrote:

> hello,
>
> Here's other logs of the latest jm crash.
>
>
> Le lun. 6 sept. 2021 à 14:18, houssem  a écrit :
>
>> hello,
>>
>> I have three jobs running on my kubernetes cluster and each job has his
>> own cluster id.
>>
>> On 2021/09/06 03:28:10, Yangze Guo  wrote:
>> > Hi,
>> >
>> > The root cause is not "java.lang.NoClassDefFound". The job has been
>> > running but could not edit the config map
>> > "myJob--jobmanager-leader" and it
>> > seems finally disconnected with the API server. Is there another job
>> > with the same cluster id (myJob) ?
>> >
>> > I would also pull Yang Wang.
>> >
>> > Best,
>> > Yangze Guo
>> >
>> > On Mon, Sep 6, 2021 at 10:10 AM Caizhi Weng 
>> wrote:
>> > >
>> > > Hi!
>> > >
>> > > There is a message saying "java.lang.NoClassDefFound Error:
>> org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you
>> visiting HDFS in your job? If yes it seems that your Flink distribution or
>> your cluster is lacking hadoop classes. Please make sure that there are
>> hadoop jars in the lib directory of Flink, or your cluster has set the
>> HADOOP_CLASSPATH environment variable.
>> > >
>> > > mejri houssem  于2021年9月4日周六 上午12:15写道:
>> > >>
>> > >>
>> > >> Hello ,
>> > >>
>> > >> I am facing a JM crash lately. I am deploying a flink application
>> cluster on kubernetes.
>> > >>
>> > >> When i install my chart using helm everything works fine but after
>> some time ,the Jm starts to crash
>> > >>
>> > >> and then it gets deleted eventually after 5 restarts.
>> > >>
>> > >> flink version: 1.12.5 (upgraded recently from 1.12.2)
>> > >> HA mode : k8s
>> > >>
>> > >> Here's the full log of the JM attached file.
>> >
>>
>


Issue while creating Hive table from Kafka topic

2021-09-09 Thread Harshvardhan Shinde
Hi,

I'm trying a simple flink job that reads data from a kafka topic and
creates a Hive table.

I'm following the steps from here

.

Here's my code:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name= "myhive";
String defaultDatabase = "harsh_test";
String hiveConfDir = "/etc/hive/conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);

tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
  "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
  "  `partition` BIGINT METADATA VIRTUAL,\n" +
  "  `offset` BIGINT METADATA VIRTUAL,\n" +
  "account_id  BIGINT,\n" +
  "amount  BIGINT,\n" +
  "transaction_time TIMESTAMP(3),\n" +
  "WATERMARK FOR transaction_time AS transaction_time -
INTERVAL '5' SECOND\n" +
  ") WITH (\n" +
  "'connector' = 'kafka',\n" +
  "'topic' = 'flink-stream-table',\n" +
  "'properties.bootstrap.servers' = ':9092',\n" +
  "   'scan.startup.mode' = 'earliest-offset',\n" +
  "'format'= 'csv'\n" +
  ")");

Table table = tableEnv.sqlQuery("Select * from transactions");
table.execute().print();

The code builds successfully, but I'm getting the following runtime error:

Caused by: java.util.concurrent.CompletionException:
java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
..

Here are my pom.xml file contents:


http://maven.apache.org/POM/4.0.0;
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

com.harsh.test
harsh-flink-test
1.0-SNAPSHOT
jar

Flink Quickstart Job
http://www.myorganization.org


UTF-8
1.13.2
1.8
2.3.6
2.12
${java.version}
${java.version}




apache.snapshots
Apache Development Snapshot Repository

https://repository.apache.org/content/repositories/snapshots/

false


true








org.apache.flink
flink-java
${flink.version}


org.apache.flink

flink-streaming-java_${scala.binary.version}
${flink.version}








org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}



org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}
${flink.version}



org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}




org.apache.flink
flink-table-planner_2.12
1.13.2





org.apache.flink

flink-connector-hive_${scala.binary.version}
${flink.version}




org.apache.hive
hive-exec
${hive.version}




javax.servlet
javax.servlet-api
3.1.0
provided




org.apache.htrace
htrace-core4
4.0.1-incubating




commons-configuration
commons-configuration
1.10




commons-logging
commons-logging
1.2




org.apache.flink
flink-shaded-hadoop-2
2.8.3-10.0





org.apache.flink
flink-hadoop-compatibility_2.12
1.13.2




org.apache.flink
flink-hadoop-fs
1.13.2




org.apache.flink
flink-csv
1.13.2


 

hbase 列设置TTL过期后,flink不能再写入数据

2021-09-09 Thread xiaohui zhang
Flink:1.12.1
Flink-connector: 2.2
Hbase: 2.1.0 + CDH6.3.2
现象:如果hbase列族设置了TTL,当某一rowkey写入数据,到达过期时间,列族会被hbase标记为删除。
后续如果有相同key的数据过来,flink无法将数据写入到hbase中,查询hbase中列族一直为空。

执行的过程大致如下:
创建Hbase表,test, 两个列族 cf1 , TTL 60, cf2, TTL 120,
数据TTL分别为1分钟,2分钟。
使用sql写入数据至表中

insert into test
select
'rowkey',
ROW('123'),
ROW('456')
from
   sometable;

过一分钟后,通过hbase 查询,可发现无cf1数据,两分钟后该rowkey无对应数据。
此时再通过flink写入数据,发现无法写入,且flink不报错

请问这个情况是Bug,还是Hbase的问题呢?


Re: aws s3 configuring error for flink image

2021-09-09 Thread Chesnay Schepler
This is a limitation of the presto version; use 
flink-s3-fs-hadoop-1.11.3.jar instead.


On 08/09/2021 20:39, Dhiru wrote:

I copied
FROM flink:1.11.3-scala_2.12-java11 RUN mkdir 
./plugins/flink-s3-fs-presto RUN cp 
./opt/flink-s3-fs-presto-1.11.3.jar   ./plugins/flink-s3-fs-presto/
then started getting this error , trying to run on aws eks and trying 
to access s3 bucket

2021-09-08 14:38:10
java.lang.UnsupportedOperationException: This s3 file system 
implementation does not support recoverable writers.
at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:260)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Unknown Source)


On Wednesday, September 8, 2021, 12:47:10 PM EDT, Chesnay Schepler 
 wrote:



you need to put the flink-s3-fs-hadoop/presto jar into a directory 
within the plugins directory, for example the final path should look 
like this:


/opt/flink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.13.1.jar

Furthermore, you only need either the hadoop or presto jar, _not_ both 
of them.


See also:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins 

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/ 



On 08/09/2021 17:10, Dhiru wrote:
yes I copied to plugin folder but not sure same jar I see in  /opt as 
well by default


root@d852f125da1f:/opt/flink/plugins# ls
README.txt  flink-s3-fs-hadoop-1.13.1.jar metrics-datadog  
 metrics-influx metrics-prometheus  metrics-statsd
external-resource-gpu flink-s3-fs-presto-1.13.1.jar metrics-graphite  
metrics-jmx  metrics-slf4j


I need  help sooner on this

On Wednesday, September 8, 2021, 09:26:46 AM EDT, Dhiru 
  wrote:




yes I copied to plugin folder but not sure same jar I see in /opt as 
well by default


root@d852f125da1f:/opt/flink/plugins# ls
README.txt  flink-s3-fs-hadoop-1.13.1.jar metrics-datadog  
 metrics-influx metrics-prometheus  metrics-statsd
external-resource-gpu flink-s3-fs-presto-1.13.1.jar metrics-graphite  
metrics-jmx  metrics-slf4j



On Wednesday, September 8, 2021, 02:58:38 AM EDT, Martijn Visser 
  wrote:



Hi,

Have you copied the correct JAR [1] to the plugins directory?

Best regards,

Martijn

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html 



On Wed, 8 Sept 2021 at 04:27, Dhiru > wrote:


Need to configure aws S3 getting this error
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3'. The
scheme is directly supported by Flink through the following
plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please 

Re: State processor API very slow reading a keyed state with RocksDB

2021-09-09 Thread Piotr Nowojski
Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried
profiling/flame graphs and I was not able to make much sense out of those
results. There are no IO/Memory bottlenecks that I could notice, it looks
indeed like the Job is stuck inside RocksDB itself. This might be an issue
with for example memory configuration. Streaming jobs and State Processor
API are running in very different environments as the latter one is using
DataSet API under the hood, so maybe that can explain this? However I'm no
expert in neither DataSet API nor the RocksDB, so it's hard for me to make
progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse  napisał(a):

> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>