Re: Multiple select queries in single job on flink table API

2021-04-16 Thread Yuval Itzchakov
Yes. Instead of calling execute on each table, create a StatementSet using
your StreamTableEnvironment (tableEnv.createStatementSet) and use addInsert
and finally .execute when you want to run the job.




On Sat, Apr 17, 2021, 03:20 tbud  wrote:

> If I want to run two different select queries on a flink table created from
> the dataStream, the blink-planner runs them as two different jobs. Is there
> a way to combine them and run as a single job ? Example code :
>
> /StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(4);
>
> System.out.println("Running credit scores : ");
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> DataStream recordsStream =
> env.readTextFile("src/main/resources/credit_trial.csv");
>
> DataStream creditStream = recordsStream
> .filter((FilterFunction) line -> !line.contains(
> "Loan ID,Customer ID,Loan Status,Current Loan
> Amount,Term,Credit Score,Annual Income,Years in current job" +
> ",Home Ownership,Purpose,Monthly Debt,Years of
> Credit History,Months since last delinquent,Number of Open Accounts," +
> "Number of Credit Problems,Current Credit
> Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
> .map(new MapFunction() {
>
> @Override
> public CreditRecord map(String s) throws Exception {
>
> String[] fields = s.split(",");
>
> return new CreditRecord(fields[0], fields[2],
> Double.parseDouble(fields[3]),
> fields[4], fields[5].trim().equals("")?0.0:
> Double.parseDouble(fields[5]),
>
> fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
> fields[8], Double.parseDouble(fields[15]));
> }
> });
> tableEnv.createTemporaryView("CreditDetails", creditStream);
> Table creditDetailsTable = tableEnv.from("CreditDetails");
>
> Table resultsTable = creditDetailsTable.select($("*"))
> .filter($("loanStatus").isEqual("Charged Off"));
>
> TableResult result = resultsTable.execute();
>
> result.print();
>
>
> Table resultsTable2 = creditDetailsTable.select($("*"))
> .filter($("loanStatus").isEqual("Fully Paid"));
>
> TableResult result2 = resultsTable2.execute();
>
> result2.print();/
>
> The above code creates 2 different jobs, but I don't want that, I want it
> to
> run in a single job. Is there any way out ?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


flink-kafka-connector Producer.setWriteTimestampToKafka(true) 导致的问题

2021-04-16 Thread lp
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink
to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢?


flinkKafkaProducer.setWriteTimestampToKafka(true);




--
Sent from: http://apache-flink.147419.n8.nabble.com/


(无主题)

2021-04-16 Thread maozhaolin
退订


| |
mao18698726900
|
|
邮箱:mao18698726...@163.com
|

签名由 网易邮箱大师 定制

????

2021-04-16 Thread ??????


Multiple select queries in single job on flink table API

2021-04-16 Thread tbud
If I want to run two different select queries on a flink table created from
the dataStream, the blink-planner runs them as two different jobs. Is there
a way to combine them and run as a single job ? Example code :

/StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

System.out.println("Running credit scores : ");

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream recordsStream =
env.readTextFile("src/main/resources/credit_trial.csv");

DataStream creditStream = recordsStream
.filter((FilterFunction) line -> !line.contains(
"Loan ID,Customer ID,Loan Status,Current Loan
Amount,Term,Credit Score,Annual Income,Years in current job" +
",Home Ownership,Purpose,Monthly Debt,Years of
Credit History,Months since last delinquent,Number of Open Accounts," +
"Number of Credit Problems,Current Credit
Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
.map(new MapFunction() {

@Override
public CreditRecord map(String s) throws Exception {

String[] fields = s.split(",");

return new CreditRecord(fields[0], fields[2],
Double.parseDouble(fields[3]),
fields[4], fields[5].trim().equals("")?0.0:
Double.parseDouble(fields[5]),
   
fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
fields[8], Double.parseDouble(fields[15]));
}
});
tableEnv.createTemporaryView("CreditDetails", creditStream);
Table creditDetailsTable = tableEnv.from("CreditDetails");

Table resultsTable = creditDetailsTable.select($("*"))
.filter($("loanStatus").isEqual("Charged Off"));

TableResult result = resultsTable.execute();

result.print();


Table resultsTable2 = creditDetailsTable.select($("*"))
.filter($("loanStatus").isEqual("Fully Paid"));

TableResult result2 = resultsTable2.execute();

result2.print();/

The above code creates 2 different jobs, but I don't want that, I want it to
run in a single job. Is there any way out ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [External] : Re: Conflict in the document - About native Kubernetes per job mode

2021-04-16 Thread Fuyao Li
Hello Yang,

Please take a look at the PR when you are free.
https://github.com/apache/flink/pull/15602

Should be a simple change. Thanks!

Best,
Fuyao

From: Fuyao Li 
Date: Tuesday, April 13, 2021 at 19:10
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
Hello Yang,

I also created a PR for this issue. Please take a look.
Refer to 
https://github.com/apache/flink/pull/15602

Thanks,
Fuyao

From: Fuyao Li 
Date: Tuesday, April 13, 2021 at 18:23
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
Hello Yang,

I tried to create a ticket 
https://issues.apache.org/jira/browse/FLINK-22264
I just registered as a user and I can’t find a place to assign the task to 
myself… Any idea on this jira issue?

Thanks.

Best,
Fuyao

From: Yang Wang 
Date: Tuesday, April 13, 2021 at 03:01
To: Fuyao Li 
Cc: user , Yan Wang 
Subject: Re: [External] : Re: Conflict in the document - About native 
Kubernetes per job mode
I think it makes sense to have such a simple fix.

Could you please create a ticket and attach a PR?

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
下午2:24写道:
Hello Yang,

It is very kind of you to give such a detailed explanation! Thanks for 
clarification.

For the small document fix I mentioned, what do you think?

Best,
Fuyao

From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Monday, April 12, 2021 at 23:03
To: Fuyao Li mailto:fuyao...@oracle.com>>
Cc: user mailto:user@flink.apache.org>>, Yan Wang 
mailto:y.yan.w.w...@oracle.com>>
Subject: [External] : Re: Conflict in the document - About native Kubernetes 
per job mode
Hi Fuyao,

Currently, Flink only supports perjob mode for Yarn. The standalone job cluster 
has been replaced with standalone application mode
after FLIP-85[1]. Both standalone Flink on K8s and native K8s integration do 
not support per-job mode.

In your attached video, it is a PoC implementation for presentation. We have 
introduced the Kubernetes application mode in release 1.11
and agree to not have a per-job cluster support. The only reason is that it is 
not very convenient to ship the job graphs, user jars, artifacts in
Kubernetes environment.

[1]. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode

Best,
Yang

Fuyao Li mailto:fuyao...@oracle.com>> 于2021年4月13日周二 
上午8:10写道:
Hello Community, Yang,

I noticed a conflict in the document for per-job mode support for Kubernetes.
In the doc here [1], it mentions
in a Flink Job Cluster, the available cluster manager (like YARN or Kubernetes) 
is used to spin up a cluster for each submitted job and this cluster is 
available to that job only.
It implies per job mode is supported in Kubernetes.

However, in the docs [2] and [3], it clearly points out per-job mode is not 
supported in Kubernetes.

This is a conflict statement and is kind of misleading. If needed, I can create 
an MR to delete the statement in [1] for Kubernetes.. It is a small fix.

I also noticed another thing in the video [4] at 25:08. Yang, you are executing 
a command with -e kubernetes-per-job flag. I tried and found such command is 
not supported in Flink distribution at all. I noticed the version you are using 
is 1.11-snapshot during the demo. Are you modifying the source code and 
generated an internal version of Flink….?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#flink-job-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#per-job-cluster-mode
[3] 

Flink Statefun Python Batch

2021-04-16 Thread Timothy Bess
Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the
Python SDK rather than processing events one by one? We're trying to run
our data scientist's machine learning model through the SDK, but the code
is very slow when we do single events and we don't get many of the benefits
of Pandas/etc.

Thanks,

Tim


Re: 2-phase commit and kafka

2021-04-16 Thread Vishal Santoshi
Thanks for the feedback and. glad I am on the right track.

> Outstanding transactions should be automatically aborted on restart by
Flink.

Let me understand this

1. Flink pipe is cancelled and has dangling kafka transactions.
2. A new Flink pipe  ( not restored from a checkpoint or sp ) is started
which is essentially the same pipe as 1 but does not restore. Would
the dangling kafka transactions be aborted ?

If yes, how does it work? As in how does the new pipe. know which
transactions to abort ? Does it ask kafka for pending transactions and know
which one belongs to the first pipe ( maybe b'coz they share some id b'coz
of name of the pipe or something else ) ?

Thanks again,

Vishal



On Fri, Apr 16, 2021 at 1:37 PM Arvid Heise  wrote:

> Hi Vishal,
>
> I think you pretty much nailed it.
>
> Outstanding transactions should be automatically aborted on restart by
> Flink. Flink (re)uses a pool of transaction ids, such that all possible
> transactions by Flink are canceled on restart.
>
> I guess the biggest downside of using a large transaction timeout is that
> other clients might leak transactions for a longer period of time or that
> they may linger for a longer time if you stop an application entirely (for
> example for an upgrade).
>
> On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi 
> wrote:
>
>> Hello folks
>>
>> So AFAIK data loss on exactly once will happen if
>>
>>-
>>
>>start a transaction on kafka.
>>-
>>
>>pre commit done ( kafka is prepared for the commit )
>>-
>>
>>commit fails ( kafka went own or n/w issue or what ever ). kafka has
>>an uncommitted transaction
>>-
>>
>>pipe was down for say n minutes and the kafka based transaction time
>>out is m minutes, where m < n
>>-
>>
>>the pipe restarts and tries to commit an aborted transaction and
>>fails and thus data loss
>>
>> Thus it is imperative that the ransaction.max.timeout.ms out on kafka is
>> a high value ( like n hours ) which should be greater then an SLA for
>> downtime of the pipe. As in we have to ensure that the pipe is restarted
>> before the transaction.timeout.ms set on the broker.
>>
>> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
>> only implication is what happens if we start a brand new pipeline on the
>> same topics which has yet to be resolved transactions, mostly b’coz of
>> extended timeout of a previous pipe .. I would assume we are delayed then
>> given that kafka will stall subsequent transactions from being visible to
>> the consumer, b'coz of this one outstanding trsnasaction ?
>>
>> And if that is the case, then understandably we have to abort those
>> dangling transactions before the 24 hrs time out. While there probably a
>> way to do that, does flink help.. as in set a property that will abort a
>> transaction on kafka, b'coz we need it to, given the above..
>>
>> Again I might have totally misunderstood the whole mechanics and if yes
>> apologies and will appreciate some clarifications.
>>
>>
>> Thanks.
>>
>>
>>
>>
>>


Re: Iterate Operator Checkpoint Failure

2021-04-16 Thread Lu Niu
Hi, Fabian

Thanks for replying. I created this ticket. It contains how to reproduce it
using code in flink-example package:
https://issues.apache.org/jira/browse/FLINK-22326

Best
Lu

On Fri, Apr 16, 2021 at 1:25 AM Fabian Paul 
wrote:

> Hi Lu,
>
> Can you provide some more detailed logs of what happened during the
> checkpointing phase? If it is possible please enable debug logs enabled.
>
> It would be also great know whether you have implemented your own Iterator
> Operator or what kind of Flink program you are trying to execute.
>
> Best,
> Fabian
>
> CC user-list
>
> On 15. Apr 2021, at 22:34, Lu Niu  wrote:
>
> Hi, Flink Users
>
> When we migrate from flink 1.9.1 to flink 1.11, we notice job will always
> fail on checkpoint if job uses Iterator Operator, no matter we use
> unaligned checkpoint or not. Those jobs don't have checkpoint issues in
> 1.9. Is this a known issue? Thank you!
>
> Best
> Lu
>
>
>


Primary key preservation in table API select?

2021-04-16 Thread Brad Davis
I'm trying to write a relatively simple plan using the table API, and I'm
getting horrific performance on my joins.  I discovered after looking at
the execution plan in the web UI that a number of the joins had NoUniqueKey
on one or both sides of the join.  I couldn't understand this as all of my
tables should have had well defined unique keys at all times.

However, diving deeper I've discovered that doing any kind of select
operation strips my table of its primary key, and I can't fathom why it
would do that.

For instsance, I have a GROUPS table that has a schema like this...

root
 |-- id: BIGINT NOT NULL
 |-- uuid: STRING
 |-- name: STRING
 |-- has_children: BOOLEAN
 |-- organization_id: BIGINT
 |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

When I execute  *groups = groups.select($("id"),$("organization_id")); *the
output suddenly turns into this...

root
 |-- id: BIGINT NOT NULL
 |-- organization_id: BIGINT

The original PK column is being selected, so why would the constraint not
be passed along to the new table, ensuring that a subsequent join would
have optimal performance.

In this case I could work around the problem, but in my real-world use case
I need to do a select passing multiple columns to a scalar function which
then outputs one of two columns depending on the value of a third (yes,
I've made a transistor).

If there's no way of getting a select to preserve a pre-existing PK, is
there any way i can explicitly tell a new derived table that it should
treat one or more columns as it's primary key?

regards,
Brad


Re: 2-phase commit and kafka

2021-04-16 Thread Arvid Heise
Hi Vishal,

I think you pretty much nailed it.

Outstanding transactions should be automatically aborted on restart by
Flink. Flink (re)uses a pool of transaction ids, such that all possible
transactions by Flink are canceled on restart.

I guess the biggest downside of using a large transaction timeout is that
other clients might leak transactions for a longer period of time or that
they may linger for a longer time if you stop an application entirely (for
example for an upgrade).

On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi 
wrote:

> Hello folks
>
> So AFAIK data loss on exactly once will happen if
>
>-
>
>start a transaction on kafka.
>-
>
>pre commit done ( kafka is prepared for the commit )
>-
>
>commit fails ( kafka went own or n/w issue or what ever ). kafka has
>an uncommitted transaction
>-
>
>pipe was down for say n minutes and the kafka based transaction time
>out is m minutes, where m < n
>-
>
>the pipe restarts and tries to commit an aborted transaction and fails
>and thus data loss
>
> Thus it is imperative that the ransaction.max.timeout.ms out on kafka is
> a high value ( like n hours ) which should be greater then an SLA for
> downtime of the pipe. As in we have to ensure that the pipe is restarted
> before the transaction.timeout.ms set on the broker.
>
> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
> only implication is what happens if we start a brand new pipeline on the
> same topics which has yet to be resolved transactions, mostly b’coz of
> extended timeout of a previous pipe .. I would assume we are delayed then
> given that kafka will stall subsequent transactions from being visible to
> the consumer, b'coz of this one outstanding trsnasaction ?
>
> And if that is the case, then understandably we have to abort those
> dangling transactions before the 24 hrs time out. While there probably a
> way to do that, does flink help.. as in set a property that will abort a
> transaction on kafka, b'coz we need it to, given the above..
>
> Again I might have totally misunderstood the whole mechanics and if yes
> apologies and will appreciate some clarifications.
>
>
> Thanks.
>
>
>
>
>


proper way to manage watermarks with messages combining multiple timestamps

2021-04-16 Thread Mathieu D
Hello,

I'm totally new to Flink, and I'd like to make sure I understand things
properly around watermarks.

We're processing messages from iot devices.
Those messages have a timestamp, and we have a first phase of processing
based on this timestamp. So far so good.

These messages actually "pack" together several measures taken at different
times, typically going from ~15mn back in the past from the message
timestamp, to a few seconds back.

So at a point in the processing, I'll flatMap the message stream into a
stream of measures, and I'll first need to reaffect the event time. I guess
I can do it using a TimestampAssigner, correct ?

The flatmapped stream will now mix together a large range of event-times
(so, a span of 15mn). What should I do regarding the watermark ? Should I
regenerate one ? and how ?

My measures will go through windowed aggregations. Should I use the
allowedLateness param to manage that properly ?
(Note: I'm ok with windows firing several times with updated content, if
that matters. Our downstream usage is made for that.)

Thanks a lot for your insights and pointers :-)

Mathieu


2-phase commit and kafka

2021-04-16 Thread Vishal Santoshi
Hello folks

So AFAIK data loss on exactly once will happen if

   -

   start a transaction on kafka.
   -

   pre commit done ( kafka is prepared for the commit )
   -

   commit fails ( kafka went own or n/w issue or what ever ). kafka has an
   uncommitted transaction
   -

   pipe was down for say n minutes and the kafka based transaction time out
   is m minutes, where m < n
   -

   the pipe restarts and tries to commit an aborted transaction and fails
   and thus data loss

Thus it is imperative that the ransaction.max.timeout.ms out on kafka is a
high value ( like n hours ) which should be greater then an SLA for
downtime of the pipe. As in we have to ensure that the pipe is restarted
before the transaction.timeout.ms set on the broker.

The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
only implication is what happens if we start a brand new pipeline on the
same topics which has yet to be resolved transactions, mostly b’coz of
extended timeout of a previous pipe .. I would assume we are delayed then
given that kafka will stall subsequent transactions from being visible to
the consumer, b'coz of this one outstanding trsnasaction ?

And if that is the case, then understandably we have to abort those
dangling transactions before the 24 hrs time out. While there probably a
way to do that, does flink help.. as in set a property that will abort a
transaction on kafka, b'coz we need it to, given the above..

Again I might have totally misunderstood the whole mechanics and if yes
apologies and will appreciate some clarifications.


Thanks.


Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-16 Thread Dylan Forciea
Jark,

Thanks for the heads up! I didn’t see this behavior when running in batch mode 
with parallelism turned on. Is it safe to do this kind of join in batch mode 
right now, or am I just getting lucky?

Dylan

From: Jark Wu 
Date: Friday, April 16, 2021 at 5:10 AM
To: Dylan Forciea 
Cc: Timo Walther , Piotr Nowojski , 
"user@flink.apache.org" 
Subject: Re: Nondeterministic results with SQL job when parallelism is > 1

HI Dylan,

I think this has the same reason as 
https://issues.apache.org/jira/browse/FLINK-20374.
The root cause is that changelogs are shuffled by `attr` at second join,
and thus records with the same `id` will be shuffled to different join tasks 
(also different sink tasks).
So the data arrived at sinks are not ordered on the sink primary key.

We may need something like primary key ordering mechanism in the whole planner 
to fix this.

Best,
Jark

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
On a side note - I changed to use the batch mode per your suggestion Timo, and 
my job ran much faster and with deterministic counts with parallelism turned 
on. So I'll probably utilize that for now. However, it would still be nice to 
dig down into why streaming isn't working in case I need that in the future.

Dylan

On 4/14/21, 10:27 AM, "Dylan Forciea" 
mailto:dy...@oseberg.io>> wrote:

Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary 
info without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, 
attr, attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT 
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, 
$1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
  :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], 
$f4=[CASE(IS NOT NULL($3), $3, $1)])
  :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
  : :- LogicalTableScan(table=[[default_catalog, default_database, 
table1]])
  : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
  :+- LogicalProject(id2=[$1], attr=[$0])
  :   +- LogicalTableScan(table=[[default_catalog, 
default_database, table2]])
  +- LogicalTableScan(table=[[default_catalog, default_database, 
table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT 
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, 
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, 
attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
  :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), 
attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
  : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], 
select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
  ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
  ::  +- TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr], changelogMode=[I])
  :+- Exchange(distribution=[hash[id2]], 
changelogMode=[I,UB,UA])
  :   +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr], changelogMode=[I,UB,UA])
  :  +- Exchange(distribution=[hash[id2]], 
changelogMode=[I])
  : +- TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2], changelogMode=[I])
  +- Exchange(distribution=[hash[attr]], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2])

Stage 5 : Attr
content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr])
ship_strategy : HASH

Stage 7 : Attr
content : Join(joinType=[FullOuterJoin], where=[(id1 = 
id2)], select=[id1, attr, id2, attr0], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : 

Re: FlatMap 报错Invalid timestamp: -1.

2021-04-16 Thread lp
查了些资料,好像说是因为FlinkKafkaProducer.setWriteTimestampToKafka(true);导致的,我使用的是flink1.12.1,
相关代码片段如下,请教是什么原因导致的呢?

//sink
Properties producerPro = new Properties();
producerPro.setProperty("bootstrap.servers",kafkaAddr);
producerPro.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
"60");
   
FlinkKafkaProducer flinkKafkaProducer = new
FlinkKafkaProducer(dwdOsqueryDetailTopic, new SimpleStringSchema(),
producerPro, null, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5);

flinkKafkaProducer.setWriteTimestampToKafka(true);
beanStr.addSink(flinkKafkaProducer);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10 flinksql 单表流汇总计算数据不对

2021-04-16 Thread Smile
你好, 
可以参考这个[1]看下是不是回撤导致的,如果是的话可以开一个比较小的 mini-batch 看能不能解决。

[1].
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-streaming-sql-group-by-td34412.html



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12.2 StreamingFileSink 问题

2021-04-16 Thread LiangbinZhang
Hi,张锴
Flink1.12支持sql直接写hive表,可以做到分钟级的数据查询,不知道符不符合你的业务需求。



张锴 wrote
> flink用的1.12.2,要sink到hdfs,选用了StreamingFileSink,导入依赖的时候maven仓库并没有1.12.2的flink-connector-filesystem的jar包,我应该选用哪个版本合适





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink1.10 flinksql 单表流汇总计算数据不对

2021-04-16 Thread LiangbinZhang
最近在观察Flink SQL 1.10任务,发现有时候聚合计算结果不对。

-- 01 因为有update数据,取这个用户当天最新的数据
-- id是表student_class_summary 的主键
create view tmp_student_class_summary as
select
uid,
class_date,
class_duration
from (select *,row_number() over(partition by id order by updated_time desc)
as rn from student_class_summary) t
where rn = 1;

-- 02 聚合计算出每个用户每天的总量

select 
  student_id,
  class_date,
  sum(class_duration) as class_duration
from tmp_student_class_summary
group by student_id,class_date;

 
每天大概有几条数据计算结果不对,不知道问题出在哪。转态是24h过期,但是用户在一天内的数据产生间隔是严格小余24小时的,排除是状态失效导致的。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink Savepoint fault tolerance

2021-04-16 Thread dhanesh arole
Hello all,

I had 2 questions regarding savepoint fault tolerance.

Job manager restart:

   - Currently, we are triggering savepoints using REST apis. And query the
   status of savepoint by the returned handle. In case there is a network
   issue because of which we couldn't receive response then in that case how
   to find out if the savepoint in the previous request was triggered or not?
   Is there a way to add "idempotency-key" to each API request so that we can
   safely retry triggering savepoint? By doing this, we want to avoid multiple
   triggers of consecutive savepoints during job upgrades.
   - Our workflow for capturing savepoint looks like this - call POST
   /savepoint endpoint. Use the returned trigger handle to periodically poll
   the status of savepoint. Once the savepoint is completed then restore the
   job from that savepoint. We are running our flink clusters in k8s. Since
   pod IPs can get restarted / migrated quite often in k8s, it's possible that
   the JM pod that was used to capture the savepoint happens to be recycled
   before completion of savepoint. In that case, we can't query the status of
   triggered savepoint from the previously returned handle. As neither the
   newly created JM pod or any other standby JMs have information about this
   savepoint. I couldn't find any config that makes Flink persist state of
   ongoing savepoints to an external store which will allow users to query the
   status of savepoint via any available JM instance in HA setup.


Task manager restart:

   - If one of the TMs crashes during ongoing checkpoint then I believe
   that checkpoint is marked as failed and on the next checkpoint interval
   Flink triggers a new checkpoint by looking at the previously completed
   checkpoint counter. The next checkpoint attempt might get acknowledged by
   all operators and marked as completed. Is that correct? In case of
   savepoints this is not possible. So how does flink resume the savepoint
   capturing process in case of job restarts or TM failures?
   - I am sure this must be already handled but just wanted to confirm and
   get help in finding relevant code references for this so I can dig deeper
   for understanding it in depth from an educational point of view.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )


Re: flink1.12.2 StreamingFileSink 问题

2021-04-16 Thread Peihui He
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html

这个参考过没呢?

张锴  于2021年4月16日周五 下午1:24写道:

> maven 仓库flink-connector-filesystem 最高1.11.3,也能用吗?
>
> guoyb <861277...@qq.com> 于2021年4月15日周四 下午10:01写道:
>
> > 1.12.0的也可以,大版本一样就行了
> >
> >
> >
> > ---原始邮件---
> > 发件人: "张锴" > 发送时间: 2021年4月15日(周四) 下午5:16
> > 收件人: "user-zh" > 主题: flink1.12.2 StreamingFileSink 问题
> >
> >
> >
> >
> flink用的1.12.2,要sink到hdfs,选用了StreamingFileSink,导入依赖的时候maven仓库并没有1.12.2的flink-connector-filesystem的jar包,我应该选用哪个版本合适
>


Re: Question about state processor data outputs

2021-04-16 Thread Chen-Che Huang
Hi Robert,

Due to some concerns, we planned to use state processor to achieve our goal. 
Now we will consider to reevaluate using datastream to do the job while 
exploring the possibility of implementing a custom FileOutputFormat. Thanks for 
your comments!

Best wishes,
Chen-Che Huang 

On 2021/04/16 06:53:37, Robert Metzger  wrote: 
> Hi,
> I assumed you are using the DataStream API, because you mentioned the
> streaming sink. But you also mentioned the state processor API (which I
> ignored a bit).
> 
> I wonder why you are using the state processor API. Can't you use the
> streaming job that created the state also for writing it to files using the
> StreamingFileSink?
> 
> If you want to stick to the DataSet API, then I guess you have to implement
> a custom (File)OutputFormat.
> 
> 
> On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:
> 
> > Hi Robert,
> >
> > Thanks for your code. It's really helpful!
> >
> > However, with the readKeyedState api of state processor, we get dataset
> > for our data instead of datastream and it seems the dataset doesn't support
> > streamfilesink (not addSink method like datastream). If not, I need to
> > transform the dataset to a datastream. I'm not sure it's doable based on
> > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> > If it's doable, then I'll be able to solve our problem with applying
> > streamfilesink to the transformed dataset.
> >
> > Best wishes,
> > Chen-Che Huang
> >
> > On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > > Hey Chen-Che Huang,
> > >
> > > I guess the StreamingFileSink is what you are looking for. It is
> > documented
> > > here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > I drafted a short example (that is not production ready), which does
> > > roughly what you are asking for:
> > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> > >
> > > Hope this helps!
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > We're going to use state processor to make our keyedstate data to be
> > > > written to different files based on the keys. More specifically, we
> > want
> > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> > where
> > > > the value with the same key is stored in the same file. In each file,
> > the
> > > > data may be stored as follows. As far as I know, I need to implement
> > my own
> > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> > to
> > > > meet the requirement. However, I wonder is there a native way to
> > achieve
> > > > this without implementing my own Sink because using official solution
> > is
> > > > usually more efficient and reliable than doing it by myself.  Many
> > thanks
> > > > for any comment.
> > > >
> > > > key1.txt
> > > > key1 value11
> > > > key1 value21
> > > > key1 value31
> > > >
> > > > key2.txt
> > > > key2 value21
> > > > key2 value22
> > > > key2 value23
> > > >
> > > > Best wishes,
> > > > Chen-Che Huang
> > > >
> > >
> >
> 


Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-16 Thread Jark Wu
HI Dylan,

I think this has the same reason as
https://issues.apache.org/jira/browse/FLINK-20374.
The root cause is that changelogs are shuffled by `attr` at second join,
and thus records with the same `id` will be shuffled to different join
tasks (also different sink tasks).
So the data arrived at sinks are not ordered on the sink primary key.

We may need something like primary key ordering mechanism in the whole
planner to fix this.

Best,
Jark

On Thu, 15 Apr 2021 at 01:33, Dylan Forciea  wrote:

> On a side note - I changed to use the batch mode per your suggestion Timo,
> and my job ran much faster and with deterministic counts with parallelism
> turned on. So I'll probably utilize that for now. However, it would still
> be nice to dig down into why streaming isn't working in case I need that in
> the future.
>
> Dylan
>
> On 4/14/21, 10:27 AM, "Dylan Forciea"  wrote:
>
> Timo,
>
> Here is the plan (hopefully I properly cleansed it of company
> proprietary info without garbling it)
>
> Dylan
>
> == Abstract Syntax Tree ==
> LogicalSink(table=[default_catalog.default_database.sink], fields=[id,
> attr, attr_mapped])
> +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS
> NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT
> NULL($3), $3, $1)])
>+- LogicalJoin(condition=[=($4, $5)], joinType=[left])
>   :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3],
> $f4=[CASE(IS NOT NULL($3), $3, $1)])
>   :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
>   : :- LogicalTableScan(table=[[default_catalog,
> default_database, table1]])
>   : +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
>   :+- LogicalProject(id2=[$1], attr=[$0])
>   :   +- LogicalTableScan(table=[[default_catalog,
> default_database, table2]])
>   +- LogicalTableScan(table=[[default_catalog, default_database,
> table3]])
>
> == Optimized Logical Plan ==
> Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
> attr_mapped], changelogMode=[NONE])
> +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT
> NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped),
> attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped],
> changelogMode=[I,UB,UA,D])
>+- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)],
> select=[id1, attr, id2, attr0, $f4, attr, attr_mapped],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey],
> changelogMode=[I,UB,UA,D])
>   :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
>   :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT
> NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
>   : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)],
> select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
>   ::- Exchange(distribution=[hash[id1]], changelogMode=[I])
>   ::  +- TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr], changelogMode=[I])
>   :+- Exchange(distribution=[hash[id2]],
> changelogMode=[I,UB,UA])
>   :   +- GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr], changelogMode=[I,UB,UA])
>   :  +- Exchange(distribution=[hash[id2]],
> changelogMode=[I])
>   : +- TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2], changelogMode=[I])
>   +- Exchange(distribution=[hash[attr]], changelogMode=[I])
>  +- TableSourceScan(table=[[default_catalog, default_database,
> table3]], fields=[attr, attr_mapped], changelogMode=[I])
>
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr])
>
> Stage 3 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2])
>
> Stage 5 : Attr
> content : GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr])
> ship_strategy : HASH
>
> Stage 7 : Attr
> content : Join(joinType=[FullOuterJoin],
> where=[(id1 = id2)], select=[id1, attr, id2, attr0],
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
> ship_strategy : HASH
>
> Stage 8 : Attr
> content : Calc(select=[id1, attr, id2,
> attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
> ship_strategy : FORWARD
>
> Stage 10 : Data Source
> content : Source: TableSourceScan(table=[[default_catalog,
> 

FlatMap 报错Invalid timestamp: -1.

2021-04-16 Thread lp
程序一直正常运行,后来突然偶尔报错如下,显示flatMap的Collect时出错:
我的flatMap transform操作代码片段如下,收集的数据是来自kafka的topic
--
SingleOutputStreamOperator text2Bean =
consumerRecordDataStreamSource.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String jsonStr, Collector out)
throws Exception {
OsqueryBean osqueryBean =
JSON.parseObject(jsonStr,OsqueryBean.class);
if (StringUtils.isNotEmpty((String)
Utils.mapGet(osqueryBean, "columns", "cmdline"))) {
out.collect(osqueryBean);
}
}
});
--
13:59:53,885 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  
[] - Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (5/8)
(171fc1965c9c20a35cb48588cd88b35f) switched from RUNNING to FAILED on
d0468f82-70e8-4b65-99f2-315466cd15cd @ 127.0.0.1 (dataPort=-1).
java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should
always be non-negative or null.
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:74)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:97)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper.serialize(KafkaSerializationSchemaWrapper.java:86)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:83)
~[classes/:?]
at
org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:78)
~[classes/:?]
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at

PyFlink UDF: When to use vectorized vs scalar

2021-04-16 Thread Yik San Chan
The question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/67122265/pyflink-udf-when-to-use-vectorized-vs-scalar

Is there a simple set of rules to follow when deciding between vectorized
vs scalar PyFlink UDF?

According to [docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html),
vectorized UDF has advantages of: (1) smaller ser-de and invocation
overhead (2) Vector calculation are highly optimized thanks to libs such as
Numpy.

> Vectorized Python user-defined functions are functions which are executed
by transferring a batch of elements between JVM and Python VM in Arrow
columnar format. The performance of vectorized Python user-defined
functions are usually much higher than non-vectorized Python user-defined
functions as the serialization/deserialization overhead and invocation
overhead are much reduced. Besides, users could leverage the popular Python
libraries such as Pandas, Numpy, etc for the vectorized Python user-defined
functions implementation. These Python libraries are highly optimized and
provide high-performance data structures and functions.

**QUESTION 1**: Is vectorized UDF ALWAYS preferred?

Let's say, in my use case, I want to simply extract some fields from a JSON
column, that is not supported by Flink [built-in functions](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/functions/systemFunctions.html)
yet, therefore I need to define my udf like:

```python
@udf(...)
def extract_field_from_json(json_value, field_name):
import json
return json.loads(json_value)[field_name]
```

**QUESTION 2**: Will I also benefit from vectorized UDF in this case?

Best,
Yik San


Re: java.io.StreamCorruptedException: unexpected block data

2021-04-16 Thread Alokh P
The flink version is 1.12.1

On Fri, Apr 16, 2021 at 1:59 PM Alokh P  wrote:

> Hi Community,
> Facing this error when trying to query Parquet data using flink SQL Client
>
> Create Table command
>
> CREATE TABLE test(
>   `username` STRING,
>   `userid` INT) WITH ('connector' = 'filesystem',  'path' =
> '/home/centos/test/0016_part_00.parquet',  'format' = 'parquet' );
>
> Select command :
>
> select * from test limit 10;
>
> Getting the following exception
>
> [ERROR] Could not execute SQL statement. 
> Reason:org.apache.flink.runtime.client.JobInitializationException:
> Could not instantiate JobManager.
>
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the 
> coordinator for operator Source: TableSourceScan(table=[[default_catalog, 
> default_database, test]], fields=[username, userid]) -> 
> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:231)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:318)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:272)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:245)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>   at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>   at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>   at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>   ... 4 more
> Caused by: java.io.StreamCorruptedException: unexpected block data
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
>   at 
> 

java.io.StreamCorruptedException: unexpected block data

2021-04-16 Thread Alokh P
Hi Community,
Facing this error when trying to query Parquet data using flink SQL Client

Create Table command

CREATE TABLE test(
  `username` STRING,
  `userid` INT) WITH ('connector' = 'filesystem',  'path' =
'/home/centos/test/0016_part_00.parquet',  'format' = 'parquet' );

Select command :

select * from test limit 10;

Getting the following exception

[ERROR] Could not execute SQL statement.
Reason:org.apache.flink.runtime.client.JobInitializationException:
Could not instantiate JobManager.

at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate
the coordinator for operator Source:
TableSourceScan(table=[[default_catalog, default_database, test]],
fields=[username, userid]) -> SinkConversionToTuple2 -> Sink: SQL
Client Stream Collect Sink
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:231)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:318)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:272)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:245)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
... 4 more
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1586)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2166)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:62)
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:337)
at 

Re: Iterate Operator Checkpoint Failure

2021-04-16 Thread Fabian Paul
Hi Lu,

Can you provide some more detailed logs of what happened during the 
checkpointing phase? If it is possible please enable debug logs enabled.

It would be also great know whether you have implemented your own Iterator 
Operator or what kind of Flink program you are trying to execute.

Best,
Fabian

CC user-list

> On 15. Apr 2021, at 22:34, Lu Niu  wrote:
> 
> Hi, Flink Users
> 
> When we migrate from flink 1.9.1 to flink 1.11, we notice job will always 
> fail on checkpoint if job uses Iterator Operator, no matter we use unaligned 
> checkpoint or not. Those jobs don't have checkpoint issues in 1.9. Is this a 
> known issue? Thank you!
> 
> Best
> Lu



Re:Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-16 Thread 马阳阳
The Flink version we used is 1.12.0.


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 04/16/2021 16:07,马阳阳 wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-16 Thread 马阳阳
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



Re: flinksql 批写并行度疑问

2021-04-16 Thread chenlei677
版本:flink 1.12
通过dynamicTableSink定义的connector



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinksql 批写并行度疑问

2021-04-16 Thread chenlei677
在常见的wordcount场景中,flinksql 批写redis , 并行度如果设置大于1,是否会导致数据覆盖的问题。谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: PyFlink: called already closed and NullPointerException

2021-04-16 Thread Yik San Chan
Hi Dian,

Regarding outcome 2, sure I will ignore them for now.
Regarding outcome 3, you have eagle eyes! Good finding!

Thank you so much, I can't imagine trying PyFlink without your help. 感谢!

Best,
Yik San

On Fri, Apr 16, 2021 at 1:54 PM Dian Fu  wrote:

> 1) Regarding to Outcome 2: The logs are just warnings and currently it has
> chances to appear during the job shutdown. It doesn’t affect the
> functionality and so you can just ignore them.
>
> 2) Regarding to Outcome 3: It should be caused by the following input:
> 3708233,4,2,100,九江,3,0,1,"iPhone9,1",中国,江西,2,1
>
> You need to remove the double quote of “iPhone9,1".
>
> Definitely, we should improve the error message. I guess this is caused of
> the same reason as the previous NPE issue and it should be addressed in
> https://issues.apache.org/jira/browse/FLINK-22297.
>
> Regards,
> Dian
>
> 2021年4月16日 上午11:37,Yik San Chan  写道:
>
> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/67118743/pyflink-called-already-closed-and-nullpointerexception
> .
>
> Hi community,
>
> I run into an issue where a PyFlink job may end up with 3 very different
> outcomes, given very slight difference in input, and luck :(
>
> The PyFlink job is simple. It first reads from a csv file, then process
> the data a bit with a Python UDF that leverages
> `sklearn.preprocessing.LabelEncoder`. I have included all necessary files
> for reproduction in the [GitHub repo](
> https://github.com/YikSanChan/pyflink-issue-call-already-closed).
>
> To reproduce:
> - `conda env create -f environment.yaml`
> - `conda activate pyflink-issue-call-already-closed-env`
> - `pytest` to verify the udf defined in `ml_udf` works fine
> - `python main.py` a few times, and you will see multiple outcomes
>
> There are 3 possible outcomes.
>
> ## Outcome 1: success!
>
> It prints 90 expected rows, in a different order from outcome 2 (see
> below).
>
> ## Outcome 2: call already closed
>
> It prints 88 expected rows first, then throws exceptions complaining
> `java.lang.IllegalStateException: call already closed`.
>
> ```
> $ python main.py
> 6> +I(1403227,2,1,5,52,0,25,0,3,2,20,0,0)
> 7> +I(2278927,5,2,7,236,2,9,1,1347,2,62,0,1)
> 5> +I(143469,0,2,7,366,2,0,1,1346,2,132,0,1)
> 1> +I(2689667,5,1,9,329,1,1,0,49,2,86,0,1)
> 2> +I(3164378,5,2,14,348,2,0,0,1508,2,99,0,0)
> 5> +I(228014,0,2,0,329,2,0,0,393,2,86,0,1)
> 1> +I(2722900,5,0,0,200,2,0,0,584,2,63,1,0)
> 2> +I(3213491,5,1,11,1,2,0,0,656,2,98,0,1)
> 8> +I(2900644,5,1,7,307,0,1,1,1353,2,138,0,0)
> 2> +I(3222862,5,2,11,353,0,6,1,1346,2,62,0,1)
> 5> +I(646044,2,2,4,343,0,14,1,1409,2,48,1,0)
> 8> +I(2962545,5,2,0,142,2,0,0,501,2,62,1,0)
> 2> +I(3225216,5,1,8,193,2,0,1,1371,2,96,0,1)
> 8> +I(3010327,5,1,13,52,2,2,0,26,2,20,0,1)
> 6> +I(1433504,5,1,0,274,2,0,0,740,2,85,1,0)
> 8> +I(3013677,5,1,0,56,2,0,0,808,2,82,1,0)
> 6> +I(1492249,5,2,32,238,2,0,1,1407,2,96,0,1)
> 7> +I(2357917,5,2,0,365,0,1,0,33,2,54,0,0)
> 6> +I(1576752,5,2,0,307,2,0,1,1347,2,138,1,0)
> 8> +I(3015812,5,2,5,335,0,14,0,1287,2,96,0,0)
> 2> +I(3288417,5,2,6,293,2,13,0,624,2,98,0,1)
> 6> +I(1588680,5,2,11,144,2,0,1,1346,2,85,0,1)
> 8> +I(3032974,5,1,0,224,2,0,0,216,2,54,1,0)
> 2> +I(3289587,5,2,0,296,2,3,0,416,2,54,0,0)
> 8> +I(3036222,5,2,0,161,2,0,0,1003,2,34,0,0)
> 5> +I(657365,2,2,0,36,2,0,1,1422,2,62,1,0)
> 8> +I(3038267,1,1,14,236,2,2,1,1357,2,62,0,1)
> 1> +I(2729639,5,2,0,380,2,1,0,319,2,129,1,0)
> 8> +I(3127877,5,0,0,384,2,2,1,1415,2,108,1,0)
> 2> +I(3306929,5,1,13,232,2,0,0,367,2,54,0,1)
> 2> +I(3319428,5,2,9,383,0,1,0,481,2,147,0,0)
> 2> +I(3348282,5,1,0,152,2,0,0,1298,2,82,1,0)
> 1> +I(2730975,5,2,7,307,2,1,1,1412,2,138,0,1)
> 6> +I(1663817,5,2,0,193,2,0,0,856,2,96,0,0)
> 7> +I(2403815,5,1,0,247,2,0,0,567,2,108,1,0)
> 6> +I(1691686,2,2,0,52,2,0,1,1346,2,20,0,1)
> 6> +I(1744025,5,2,0,353,2,0,1,1410,2,62,0,0)
> 1> +I(2757438,5,2,6,346,0,0,0,1124,2,82,0,0)
> 6> +I(1779238,5,1,32,348,0,0,1,1412,2,99,0,1)
> 1> +I(2757877,5,1,9,105,2,1,0,1324,2,44,0,1)
> 4> +I(1951579,5,2,7,250,0,0,0,30,2,62,0,1)
> 1> +I(2791951,5,2,0,86,2,0,0,812,2,147,0,0)
> 4> +I(2033542,5,1,0,348,2,0,0,591,2,99,0,1)
> 2> +I(3404386,5,1,8,375,2,0,1,1409,2,98,0,0)
> 1> +I(2802070,5,2,0,236,2,0,1,1414,2,62,0,0)
> 8> +I(3133463,5,2,9,310,2,0,0,68,2,129,0,1)
> 2> +I(3419962,5,2,0,236,2,2,0,567,2,62,0,0)
> 1> +I(2824123,5,2,0,365,0,18,1,1354,2,54,1,0)
> 8> +I(3141633,5,2,13,101,0,22,0,989,2,147,0,0)
> 5> +I(779727,1,2,10,148,0,1,0,828,2,85,0,0)
> 1> +I(2863220,5,1,12,383,0,0,0,175,2,147,0,0)
> 4> +I(2097867,5,1,10,307,0,0,0,399,2,138,0,1)
> 6> +I(1779859,2,2,0,101,2,1,1,1365,2,147,0,1)
> 4> +I(2104055,4,2,6,74,2,2,0,1223,2,83,0,1)
> 6> +I(1918655,4,1,0,304,2,0,0,963,2,98,0,1)
> 4> +I(2118337,5,2,13,147,2,1,1,1394,2,86,0,1)
> 4> +I(2176902,5,1,8,215,0,0,0,92,2,132,0,1)
> 7> +I(2404608,5,2,11,7,2,0,1,1353,2,2,0,1)
> 4> +I(2207216,5,2,0,161,2,1,1,1421,2,34,0,0)
> 7> +I(2418491,5,2,11,161,0,1,1,1415,2,34,0,0)
> 7> +I(2419129,5,1,6,52,0,7,1,1358,2,20,0,0)
> 4> 

Re: Question about state processor data outputs

2021-04-16 Thread Robert Metzger
Hi,
I assumed you are using the DataStream API, because you mentioned the
streaming sink. But you also mentioned the state processor API (which I
ignored a bit).

I wonder why you are using the state processor API. Can't you use the
streaming job that created the state also for writing it to files using the
StreamingFileSink?

If you want to stick to the DataSet API, then I guess you have to implement
a custom (File)OutputFormat.


On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:

> Hi Robert,
>
> Thanks for your code. It's really helpful!
>
> However, with the readKeyedState api of state processor, we get dataset
> for our data instead of datastream and it seems the dataset doesn't support
> streamfilesink (not addSink method like datastream). If not, I need to
> transform the dataset to a datastream. I'm not sure it's doable based on
> https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> If it's doable, then I'll be able to solve our problem with applying
> streamfilesink to the transformed dataset.
>
> Best wishes,
> Chen-Che Huang
>
> On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > Hey Chen-Che Huang,
> >
> > I guess the StreamingFileSink is what you are looking for. It is
> documented
> > here:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > I drafted a short example (that is not production ready), which does
> > roughly what you are asking for:
> > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> >
> > Hope this helps!
> >
> > Best,
> > Robert
> >
> >
> > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> wrote:
> >
> > > Hi all,
> > >
> > > We're going to use state processor to make our keyedstate data to be
> > > written to different files based on the keys. More specifically, we
> want
> > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> where
> > > the value with the same key is stored in the same file. In each file,
> the
> > > data may be stored as follows. As far as I know, I need to implement
> my own
> > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> to
> > > meet the requirement. However, I wonder is there a native way to
> achieve
> > > this without implementing my own Sink because using official solution
> is
> > > usually more efficient and reliable than doing it by myself.  Many
> thanks
> > > for any comment.
> > >
> > > key1.txt
> > > key1 value11
> > > key1 value21
> > > key1 value31
> > >
> > > key2.txt
> > > key2 value21
> > > key2 value22
> > > key2 value23
> > >
> > > Best wishes,
> > > Chen-Che Huang
> > >
> >
>


Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-16 Thread Dian Fu
Sure. I have replied. Let’s discuss it in that thread.

> 2021年4月16日 上午11:40,Yik San Chan  写道:
> 
> Hi Dian,
> 
> Thank you so much for tracking the issue!
> 
> I run into another NullPointerException when running pandas UDF, but this 
> time I add an unit test to ensure the input and output type already ... And 
> the new issue looks even more odd ... Do you mind taking a look? 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html
>  
> 
> 
> Thank you!
> 
> Best,
> Yik San
> 
> On Fri, Apr 16, 2021 at 11:05 AM Dian Fu  > wrote:
> Definitely agree with you. Have created 
> https://issues.apache.org/jira/browse/FLINK-22297 
>  as a following up.
> 
>> 2021年4月16日 上午7:10,Yik San Chan > > 写道:
>> 
>> Hi Dian,
>> 
>> I wonder if we can improve the error tracing and message so that it becomes 
>> more obvious where the problem is? To me, a NPE really says very little.
>> 
>> Best,
>> Yik San
>> 
>> On Thu, Apr 15, 2021 at 11:07 AM Dian Fu > > wrote:
>> Great! Thanks for letting me know~
>> 
>>> 2021年4月15日 上午11:01,Yik San Chan >> > 写道:
>>> 
>>> Hi Dian,
>>> 
>>> Thanks for the reminder. Yes, the original udf implementation does not 
>>> qualify the input and output type requirement. After adding a unit test, I 
>>> was able to find what's wrong, and fix my UDF implementation. Here is the 
>>> new implementation FYI.
>>> 
>>> @udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
>>> def predict(users, items):
>>> n_users, n_items = 943, 1682
>>> model = MatrixFactorization(n_users, n_items)
>>> model.load_state_dict(torch.load("model.pth"))
>>> return pd.Series(model(users, items).detach().numpy())
>>> 
>>> And here is the unit test.
>>> 
>>> def test_predict():
>>> f = predict._func
>>> users = pd.Series([1, 2, 3])
>>> items = pd.Series([1, 4, 9])
>>> preds = f(users, items)
>>> assert isinstance(preds, pd.Series)
>>> assert len(preds) == 3
>>> 
>>> Thank you so much!
>>> 
>>> Best,
>>> Yik San
>>> 
>>> On Wed, Apr 14, 2021 at 11:03 PM Dian Fu >> > wrote:
>>> Hi Yik San,
>>> 
>>> 1) There are two kinds of Python UDFs in PyFlink: 
>>> - General Python UDFs which process input elements at row basis. That is, 
>>> it will process one row at a time. 
>>> -  Pandas UDFs which process input elements at batch basis.
>>> So you are correct that you need to use Pandas UDF for your requirements.
>>> 
>>> 2) For Pandas UDF, the input type for each input argument is Pandas.Series 
>>> and the result type should also be a Pandas.Series. Besides, the length of 
>>> the result should be the same as the inputs. Could you check if this is the 
>>> case for your Pandas UDF implementation? 
>>> 
>>> Regards,
>>> Dian
>>> 
>>> 
>>> On Wed, Apr 14, 2021 at 9:44 PM Yik San Chan >> > wrote:
>>> The question is cross-posted on Stack Overflow 
>>> https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception
>>>  
>>> .
>>> 
>>> I have a ML model that takes two numpy.ndarray - `users` and `items` - and 
>>> returns an numpy.ndarray `predictions`. In normal Python code, I would do:
>>> 
>>> ```python
>>> model = load_model()
>>> 
>>> df = load_data() # the DataFrame includes 4 columns, namely, user_id, 
>>> movie_id, rating, and timestamp
>>> 
>>> users = df.user_id.values
>>> items = df.movie_id.values
>>> 
>>> predictions = model(users, items)
>>> ```
>>> 
>>> I am looking into porting this code into Flink to leverage its distributed 
>>> nature. My assumption is: by distributing the prediction workload on 
>>> multiple Flink nodes, I should be able to run the whole prediction faster.
>>> 
>>> So I compose a PyFlink job. Note I implement an UDF called `predict` to run 
>>> the prediction.
>>> 
>>> ```python
>>> # batch_prediction.py
>>> 
>>> model = load_model()
>>> 
>>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> t_env = StreamTableEnvironment.create(exec_env, 
>>> environment_settings=settings)
>>> 
>>> SOURCE_DDL = """
>>> CREATE TABLE source (
>>> user_id INT,
>>> movie_id INT,
>>> rating TINYINT,
>>> event_ms BIGINT
>>> ) WITH (
>>> 'connector' = 'filesystem',
>>> 'format' = 'csv',
>>> 'csv.field-delimiter' = '\t',
>>> 'path' = 'ml-100k/u1.test'
>>> )
>>> """
>>> 
>>> SINK_DDL = """
>>> CREATE TABLE sink (
>>> prediction DOUBLE
>>> ) WITH (
>>> 'connector' =