Re:Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread ouywl






Hi lake:    Ok, Show the jobmanager pod logs, Can you see the jm pods is running ok? Try use cube-proxy, or NodePort, That you can see the webUI?






  



Best,Ouywl



 


On 03/4/2020 14:08,LakeShen wrote: 


Hi community,        now we plan to move all flink tasks to k8s cluster. For one flink task , we want to see this flink task web ui . First , we create the k8s Service to expose 8081 port of jobmanager, then we use ingress controller so that we can see it outside.But the flink web like this :The flink web ui images and other info not display , what can I do to display flink web info ?Thanks to your replay.






Re: Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread LakeShen
In my thought , I think I should config the correct flink jobserver for
flink task

LakeShen  于2020年3月4日周三 下午2:07写道:

> Hi community,
> now we plan to move all flink tasks to k8s cluster. For one flink
> task , we want to see this flink task web ui . First , we create the k8s
> Service to expose 8081 port of jobmanager, then we use ingress controller
> so that we can see it outside.But the flink web like this :
>
> [image: image.png]
>
> The flink web ui images and other info not display , what can I do to
> display flink web info ?
> Thanks to your replay.
>


Re: Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread LakeShen
In my thought , I think I should config the correct flink jobserver for
flink task

LakeShen  于2020年3月4日周三 下午2:07写道:

> Hi community,
> now we plan to move all flink tasks to k8s cluster. For one flink
> task , we want to see this flink task web ui . First , we create the k8s
> Service to expose 8081 port of jobmanager, then we use ingress controller
> so that we can see it outside.But the flink web like this :
>
> [image: image.png]
>
> The flink web ui images and other info not display , what can I do to
> display flink web info ?
> Thanks to your replay.
>


Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread LakeShen
Hi community,
now we plan to move all flink tasks to k8s cluster. For one flink
task , we want to see this flink task web ui . First , we create the k8s
Service to expose 8081 port of jobmanager, then we use ingress controller
so that we can see it outside.But the flink web like this :

[image: image.png]

The flink web ui images and other info not display , what can I do to
display flink web info ?
Thanks to your replay.


Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread LakeShen
Hi community,
now we plan to move all flink tasks to k8s cluster. For one flink
task , we want to see this flink task web ui . First , we create the k8s
Service to expose 8081 port of jobmanager, then we use ingress controller
so that we can see it outside.But the flink web like this :

[image: image.png]

The flink web ui images and other info not display , what can I do to
display flink web info ?
Thanks to your replay.


?????? ????Flink1.10.0????hive??source??????????

2020-03-03 Thread Jun Zhang
??sql??sql??hive??sql??
   
  

 
 
 
 ??2020??03??4?? 13:25??JingsongLeehttps://issues.apache.org/jira/browse/FLINK-16413
FYI


Best,
Jingsong Lee


--
From:JingsongLee 

Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-03 Thread JingsongLee
Hi jun,

Jira: https://issues.apache.org/jira/browse/FLINK-16413
FYI

Best,
Jingsong Lee


--
From:JingsongLee 
Send Time:2020年3月3日(星期二) 19:06
To:Jun Zhang <825875...@qq.com>; user-zh@flink.apache.org 

Cc:user-zh@flink.apache.org ; like 
Subject:Re: 使用Flink1.10.0读取hive时source并行度问题

Hi jun,

很好的建议~ 这是一个优化点~ 可以建一个JIRA

Best,
Jingsong Lee


--
From:Jun Zhang <825875...@qq.com>
Send Time:2020年3月3日(星期二) 18:45
To:user-zh@flink.apache.org ; JingsongLee 

Cc:user-zh@flink.apache.org ; like 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题



hi,jinsong:
 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10,
我有一个类似的sql   select * from  mytable limit 1;
hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。
在2020年03月2日 16:38,JingsongLee 写道:
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 


在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: JobMaster does not register with ResourceManager in high availability setup

2020-03-03 Thread Xintong Song
Hi Abhinav,

The JobMaster log "Connecting to ResourceManager ..." is printed after
JobMaster retrieve ResourceManager address from ZooKeeper. In your case, I
assume there's some ZK problem that JM cannot resolve RM address.


Have you confirmed whether the ZK pods are recovered after the second
disruption? And does the address changed?


You can also try to enable debug logs for the following components, to see
if there's any useful information.

org.apache.flink.runtime.jobmaster

org.apache.flink.runtime.resourcemanager

org.apache.flink.runtime.highavailability

org.apache.flink.runtime.leaderretrieval

org.apache.zookeeper


Thank you~

Xintong Song



On Wed, Mar 4, 2020 at 5:42 AM Bajaj, Abhinav 
wrote:

> Hi,
>
>
>
> We recently came across an issue where JobMaster does not register with
> ResourceManager in Fink high availability setup.
>
> Let me share the details below.
>
>
>
> *Setup*
>
>- Flink 1.7.1
>- K8s
>- High availability mode with a *single* Jobmanager and 3 zookeeper
>nodes in quorum.
>
>
>
> *Scenario*
>
>- Zookeeper pods are disrupted by K8s that leads to resetting of
>leadership of JobMaster & ResourceManager and restart of the Flink job.
>
>
>
> *Observations*
>
>- After the first disruption of Zookeeper, JobMaster and
>ResourceManager were reset & were able to register with each other. Sharing
>few logs that confirm that. Flink job restarted successfully.
>
> org.apache.flink.runtime.jobmaster.JobMaster  - Connecting to
> ResourceManager
>
> o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering
> job manager
>
> o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered
> job manager
>
> org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully
> registered at ResourceManager...
>
>-  After another disruption later on the same Flink cluster, JobMaster
>& ResourceManager were not connected and below logs can be noticed and
>eventually scheduler times out.
>
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot
> request, no ResourceManager connected.
>
>………
>
> 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate all requires slots within timeout of 30 ms……
>
>
>- I can confirm from the logs that both JobMaster & ResourceManager
>were running. JobMaster was trying to recover the job and ResourceManager
>registered the taskmanagers.
>- The odd thing is that the log for JobMaster trying to connect to
>ResourceManager is missing. So I assume JobMaster didn’t try to connect to
>ResourceManager.
>
>
>
> I can share more logs if required.
>
>
>
> Has anyone noticed similar behavior or is this a known issue with Flink
> 1.7.1?
>
> Any recommendations or suggestions on fix or workaround?
>
>
>
> Appreciate your time and help here.
>
>
>
> ~ Abhinav Bajaj
>
>
>
>
>


Re: Should I use a Sink or Connector? Or Both?

2020-03-03 Thread Jark Wu
John is right.

Could you provide more detailed code? So that we can help to investigate.

Best,
Jark

On Wed, 4 Mar 2020 at 06:20, John Smith  wrote:

> The sink if for Streaming API, it looks like you are using SQL and tables.
> So you can use the connector to output the table result to Elastic. Unless
> you want to convert from table to stream first.
>
> On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <
> fernando.cas...@leidos.com> wrote:
>
>> Hello folks! I’m new to Flink and data streaming in general, just initial
>> FYI ;)
>>
>>
>>
>> I’m currently doing this successfully:
>>
>> 1 - streaming data from Kafka in Flink
>>
>> 2 - aggregating the data with Flink’s sqlQuery API
>>
>> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>>
>>
>>
>> My objective is to change #3 so I’m upserting into an Elasticsearch index
>> (see
>> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
>> for my complete code)
>>
>>
>>
>> I’ve been using the template for the Elasticsearch connector
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>>
>> tableEnvironment
>>
>>   .connect(...)
>>
>>   .withFormat(...)
>>
>>   .withSchema(...)
>>
>>   .inAppendMode()
>>
>>   .createTemporaryTable("MyTable")
>>
>>
>>
>> By I’m confused from seeing some old examples online. Should I be using
>> the Elasticsearch Sink (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
>> instead? Or both?
>>
>>
>>
>> I’m having trouble with the current implementation where no data is
>> outputting to Elasticsearch, but no error is being displayed in Flink (job
>> status is RUNNING).
>>
>>
>>
>> Hoping somebody could clarify what I’m missing? Thank you in advance!
>>
>>
>>
>> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>>
>


Re: CliFrontend 未优先加载用户jar包中的class

2020-03-03 Thread tison
也是一种 hack 的方法,不过社区肯定不能在 master 上这么搞就是了(x

Best,
tison.


aven.wu  于2020年3月3日周二 下午4:44写道:

> 感谢回答
> 后来我查了Flink run脚本的classpath设置,我修改了脚本将我的jar包指定在flink classpath的最前面得以解决问题
>
> Best
> Aven
>
> 发件人: tison
> 发送时间: 2020年3月3日 14:16
> 收件人: user-zh
> 主题: Re: CliFrontend 未优先加载用户jar包中的class
>
>
> https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774
>
> Best,
> tison.
>
>
> tison  于2020年3月3日周二 下午2:13写道:
>
> > 1.9.2 和 1.10 上已经修复此问题,修改可参考
> >
> > https://issues.apache.org/jira/browse/FLINK-13749
> >
> > Best,
> > tison.
> >
> >
> > aven.wu  于2020年3月3日周二 下午2:04写道:
> >
> >> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
> >> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
> >> ,在Yarn集群上提交任务的时候出现了如下异常:
> >> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
> >> at
> >>
> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
> >> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
> >> --main class jackson class load before
> >> run--
> >> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
> >> 果然是从hadoop的classpath下加载了2.2.3版本
> >>
> >> 之后查看flink run命令入口程序
> >> CliFrontend#bulidProgram line 799
> >> PackagedProgram#PackagedProgram line 221
> >> JobWithJars#BuildUserCodeClassLoad line 142
> >> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
> >> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user
> >> jar包中加载类。
> >> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
> >>
> >> Best
> >> Aven
> >>
> >>
>
>


Very large _metadata file

2020-03-03 Thread Jacob Sevart
Per the documentation:

"The meta data file of a Savepoint contains (primarily) pointers to all
files on stable storage that are part of the Savepoint, in form of absolute
paths."

I somehow have a _metadata file that's 1.9GB. Running *strings *on it I
find 962 strings, most of which look like HDFS paths, which leaves a lot of
that file-size unexplained. What else is in there, and how exactly could
this be happening?

We're running 1.6.

Jacob


Re: Should I use a Sink or Connector? Or Both?

2020-03-03 Thread John Smith
The sink if for Streaming API, it looks like you are using SQL and tables.
So you can use the connector to output the table result to Elastic. Unless
you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. 
wrote:

> Hello folks! I’m new to Flink and data streaming in general, just initial
> FYI ;)
>
>
>
> I’m currently doing this successfully:
>
> 1 - streaming data from Kafka in Flink
>
> 2 - aggregating the data with Flink’s sqlQuery API
>
> 3 - outputting the result of #2 into STDOUT via toRetreatStream()
>
>
>
> My objective is to change #3 so I’m upserting into an Elasticsearch index
> (see
> https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
> for my complete code)
>
>
>
> I’ve been using the template for the Elasticsearch connector
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
>
> tableEnvironment
>
>   .connect(...)
>
>   .withFormat(...)
>
>   .withSchema(...)
>
>   .inAppendMode()
>
>   .createTemporaryTable("MyTable")
>
>
>
> By I’m confused from seeing some old examples online. Should I be using
> the Elasticsearch Sink (
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
> instead? Or both?
>
>
>
> I’m having trouble with the current implementation where no data is
> outputting to Elasticsearch, but no error is being displayed in Flink (job
> status is RUNNING).
>
>
>
> Hoping somebody could clarify what I’m missing? Thank you in advance!
>
>
>
> Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10
>


Re: StreamingFileSink Not Flushing All Data

2020-03-03 Thread Austin Cawley-Edwards
Hi all,

Thanks for the docs pointer/ FLIP Rafi, and the workaround strategy Kostas
-- strange though, as I wasn't using a bounded source when I first ran into
this issue. I have updated the example repo to use an unbounded source[1],
and the same file corruption problems remain.

Anything else I could be doing wrong with the compression stream?

Thanks again,
Austin

[1]:
https://github.com/austince/flink-streaming-file-sink-compression/tree/unbounded

On Tue, Mar 3, 2020 at 3:50 AM Kostas Kloudas  wrote:

> Hi Austin and Rafi,
>
> @Rafi Thanks for providing the pointers!
> Unfortunately there is no progress on the FLIP (or the issue).
>
> @ Austin In the meantime, what you could do --assuming that your input is
> bounded --  you could simply not stop the job after the whole input is
> processed, then wait until the output is committed, and then cancel the
> job. I know and I agree that this is not an elegant solution but it is a
> temporary workaround.
>
> Hopefully the FLIP and related issue is going to be prioritised soon.
>
> Cheers,
> Kostas
>
> On Tue, Mar 3, 2020 at 8:04 AM Rafi Aroch  wrote:
>
>> Hi,
>>
>> This happens because StreamingFileSink does not support a finite input
>> stream.
>> In the docs it's mentioned under "Important Considerations":
>>
>> [image: image.png]
>>
>> This behaviour often surprises users...
>>
>> There's a FLIP
>> 
>>  and
>> an issue  about
>> fixing this. I'm not sure what's the status though, maybe Kostas can share.
>>
>> Thanks,
>> Rafi
>>
>>
>> On Mon, Mar 2, 2020 at 5:05 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Dawid and Kostas,
>>>
>>> Sorry for the late reply + thank you for the troubleshooting. I put
>>> together an example repo that reproduces the issue[1], because I did have
>>> checkpointing enabled in my previous case -- still must be doing something
>>> wrong with that config though.
>>>
>>> Thanks!
>>> Austin
>>>
>>> [1]: https://github.com/austince/flink-streaming-file-sink-compression
>>>
>>>
>>> On Mon, Feb 24, 2020 at 5:28 AM Kostas Kloudas 
>>> wrote:
>>>
 Hi Austin,

 Dawid is correct in that you need to enable checkpointing for the
 StreamingFileSink to work.

 I hope this solves the problem,
 Kostas

 On Mon, Feb 24, 2020 at 11:08 AM Dawid Wysakowicz
  wrote:
 >
 > Hi Austing,
 >
 > If I am not mistaken the StreamingFileSink by default flushes on
 checkpoints. If you don't have checkpoints enabled it might happen that not
 all data is flushed.
 >
 > I think you can also adjust that behavior with:
 >
 > forBulkFormat(...)
 >
 > .withRollingPolicy(/* your custom logic */)
 >
 > I also cc Kostas who should be able to correct me if I am wrong.
 >
 > Best,
 >
 > Dawid
 >
 > On 22/02/2020 01:59, Austin Cawley-Edwards wrote:
 >
 > Hi there,
 >
 > Using Flink 1.9.1, trying to write .tgz files with the
 StreamingFileSink#BulkWriter. It seems like flushing the output stream
 doesn't flush all the data written. I've verified I can create valid files
 using the same APIs and data on there own, so thinking it must be something
 I'm doing wrong with the bulk format. I'm writing to the local filesystem,
 with the `file://` protocol.
 >
 > For Tar/ Gzipping, I'm using the Apache Commons Compression library,
 version 1.20.
 >
 > Here's a runnable example of the issue:
 >
 > import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 > import
 org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 > import
 org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
 > import org.apache.flink.api.common.serialization.BulkWriter;
 > import org.apache.flink.core.fs.FSDataOutputStream;
 > import org.apache.flink.core.fs.Path;
 > import
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 > import
 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 >
 > import java.io.FileOutputStream;
 > import java.io.IOException;
 > import java.io.Serializable;
 > import java.nio.charset.StandardCharsets;
 >
 > class Scratch {
 >   public static class Record implements Serializable {
 > private static final long serialVersionUID = 1L;
 >
 > String id;
 >
 > public Record() {}
 >
 > public Record(String id) {
 >   this.id = id;
 > }
 >
 > public String getId() {
 >   return id;
 > }
 >
 > public void setId(String id) {
 >   this.id = id;
 > }
 >   }
 >
 >   public static void main(String[] args) throws Exception {
 > final 

JobMaster does not register with ResourceManager in high availability setup

2020-03-03 Thread Bajaj, Abhinav
Hi,

We recently came across an issue where JobMaster does not register with 
ResourceManager in Fink high availability setup.
Let me share the details below.

Setup

  *   Flink 1.7.1
  *   K8s
  *   High availability mode with a single Jobmanager and 3 zookeeper nodes in 
quorum.

Scenario

  *   Zookeeper pods are disrupted by K8s that leads to resetting of leadership 
of JobMaster & ResourceManager and restart of the Flink job.

Observations

  *   After the first disruption of Zookeeper, JobMaster and ResourceManager 
were reset & were able to register with each other. Sharing few logs that 
confirm that. Flink job restarted successfully.

org.apache.flink.runtime.jobmaster.JobMaster  - Connecting to 
ResourceManager

o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering job 
manager

o.a.flink.runtime.resourcemanager.StandaloneResourceManager  - Registered job 
manager

org.apache.flink.runtime.jobmaster.JobMaster  - JobManager successfully 
registered at ResourceManager...

  *After another disruption later on the same Flink cluster, JobMaster & 
ResourceManager were not connected and below logs can be noticed and eventually 
scheduler times out.
org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Cannot serve slot 
request, no ResourceManager connected.

   ………


org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate all requires slots within timeout of 30 ms……

  *   I can confirm from the logs that both JobMaster & ResourceManager were 
running. JobMaster was trying to recover the job and ResourceManager registered 
the taskmanagers.
  *   The odd thing is that the log for JobMaster trying to connect to 
ResourceManager is missing. So I assume JobMaster didn’t try to connect to 
ResourceManager.

I can share more logs if required.

Has anyone noticed similar behavior or is this a known issue with Flink 1.7.1?
Any recommendations or suggestions on fix or workaround?

Appreciate your time and help here.

~ Abhinav Bajaj




Should I use a Sink or Connector? Or Both?

2020-03-03 Thread Castro, Fernando C.
Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

I’m currently doing this successfully:
1 - streaming data from Kafka in Flink
2 - aggregating the data with Flink’s sqlQuery API
3 - outputting the result of #2 into STDOUT via toRetreatStream()

My objective is to change #3 so I’m upserting into an Elasticsearch index (see 
https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors
 for my complete code)

I’ve been using the template for the Elasticsearch connector 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

By I’m confused from seeing some old examples online. Should I be using the 
Elasticsearch Sink 
(https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink)
 instead? Or both?

I’m having trouble with the current implementation where no data is outputting 
to Elasticsearch, but no error is being displayed in Flink (job status is 
RUNNING).

Hoping somebody could clarify what I’m missing? Thank you in advance!

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10


How to print the aggregated state everytime it is updated?

2020-03-03 Thread kant kodali
Hi All,

I have a custom aggregated state that is represent by Set and I have
a stream of values coming in from Kafka where I inspect, compute the custom
aggregation and store it in Set. Now, I am trying to figureout how do
I print the updated value everytime this state is updated?

Imagine I have a Datastream>

I tried few things already but keep running into the following exception.
Not sure why? Do I need to call assignTimestampsAndWatermark? I thought
watermarks are not mandatory in Flink especially when I want to keep this
aggregated state forever. any simple code sample on how to print the
streaming aggregated state represented by Datastream> will be
great! You can imagine my Set has a toString() method that takes
cares of printing..and I just want to see those values in stdout.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp
(= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?


Re: Flink Session Windows State TTL

2020-03-03 Thread karl.pullicino
Added  flink_oom_exception.txt

  
as originally forgot to attach it



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Session Windows State TTL

2020-03-03 Thread karl.pullicino
Hi all,We have an Apache Flink application which generates player sessions
based on player events keyed by playerId. Sessions are based on EventTime. A
session is created on first event event for that player and closes if there
are 30 mins of inactivity. Events are merged in our custom
/PlayerSessionAggregator implements AggregateFunction/. We deployed this
application on a Flink dev cluster (having checkpoints enabled), however we
noted that the state keeps growing until we end up with an out of memory as
shown in the attached file /flink_oom_exception.txt/We tried the using the
/PurgingTrigger/ together /CountTrigger/ however since it uses
/FIRE_AND_PURGE/ we were ending up with a session per event i.e. event were
not being merged.Using an /Evictor/ we ended up with same situation because
events were being removed from the window. Hence we resorted to using State
TTL:
 
We created a /StateTtlConfig/ having an expiry of 120 minutes to
periodically remove expired sessions from the state. 
This /stateTtlConfig/ is passed to the flatMap /PlayerSessionEventMapper
extends RichFlatMapFunction/. 
 The /PlayerSessionEventMapper/ has a /ValueStateDescriptor/ to provide
access to state per player. This /ValueStateDescriptor/ uses the previously
mentioned /stateTtlConfig/ 
 The state per player is updated on each player event. Also we enforce a
state access (using / ValueState.value()/) since as per documentation
"expired values are only removed when they are read out explicitly, e.g. by
calling ValueState.value()" 
This idea was based on the examples as provided in:
 https://flink.apache.org/2019/05/19/state-ttl.html 

https://www.ververica.com/blog/state-ttl-for-apache-flink-how-to-limit-the-lifetime-of-state
 

https://www.slideshare.net/FlinkForward/time-tolive-how-to-perform-automatic-state-cleanup-in-apache-flink-andrey-zagrebin-ververica
 

https://cwiki.apache.org/confluence/display/FLINK/FLIP-25%3A+Support+User+State+TTL+Natively
 
*Code: * PlayerSessionApp.java

  
PlayerSessionEventMapper.java

 
(Some custom classes have been removed for simplicity reasons)

* Our questions are: *
are expired session windows automatically removed from state? if not, what's
the best way to do it? 
how can we query state size? 
how can we query number of windows in state? 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Single stream, two sinks

2020-03-03 Thread John Smith
If I understand correctly he wants the HTTP result in the DB. So I do not
think side output works here. The DB would have to be the sink. Also sinks
in Flink are the final destination.

So it would have to be RabbitMQ -> Some Cool Business Logic Operators
Here > Async I/O HTTP Operator -> JDBC Sink.

Take look here also:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
<--
The Example shows database client, but you can easily replace that with
HTTP client.

But basically...
1- Get input from RabbitMQ Source.
2- Do what ever type of stream computations/business logic you need.
3- Use the Async I/O operator to send HTTP
- If HTTP 200 OK create Flink record tagged as SUCESS and what ever
other info you want. Maybe response body.
- If NOT HTTO 200 OK create Flink record tagged as FAILED plus other
info.
4- Sink the output record from #3 to JDBC.

On Sun, 1 Mar 2020 at 10:28, miki haiat  wrote:

> So you have rabitmq source and http sink?
> If so you can use side output in order to dump your data to db.
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> On Sat, Feb 29, 2020, 23:01 Gadi Katsovich 
> wrote:
>
>> Hi,
>> I'm new to flink and am evaluating it to replace our existing streaming
>> application.
>> The use case I'm working on is reading messages from RabbitMQ queue,
>> applying some transformation and filtering logic and sending it via HTTP to
>> a 3rd party.
>> A must have requirement of this flow is to to write the data that was
>> sent to an SQL db, for audit and troubleshooting purposes.
>> I'm currently basing my HTTP solution on a PR with needed adjustments:
>> https://github.com/apache/flink/pull/5866/files
>> How can I add an insertion to a DB after a successful HTTP request?
>> Thank you.
>>
>


Re: java.util.concurrent.ExecutionException

2020-03-03 Thread Gary Yao
Hi,

Thanks for getting back, and I am glad that you were able to resolve the
issue. The
root cause in the stacktrace you posted also indicates a problem related
to Kafka:

Caused by:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException:
Timeout of 6ms expired before the position for partition edges-0 could
be determined

Best,
Gary

On Tue, Mar 3, 2020 at 5:46 PM kant kodali  wrote:

> Hi Gary,
>
> This has to do with my Kafka. After restarting Kafka it seems to work
> fine!
>
> Thanks!
>
> On Tue, Mar 3, 2020 at 8:18 AM kant kodali  wrote:
>
>> The program finished with the following exception:
>>
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>>
>> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>>
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>
>> at Test.main(Test.java:71)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:498)
>>
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>>
>> ... 8 more
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> Job failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>>
>> at
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>
>> at
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>
>> at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>>
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

RE: Building with Hadoop 3

2020-03-03 Thread LINZ, Arnaud
Hello,
Have you shared it somewhere on the web already?
Best,
Arnaud

De : vino yang 
Envoyé : mercredi 4 décembre 2019 11:55
À : Márton Balassi 
Cc : Chesnay Schepler ; Foster, Craig 
; user@flink.apache.org; d...@flink.apache.org
Objet : Re: Building with Hadoop 3

Hi Marton,

Thanks for your explanation. Personally, I look forward to your contribution!

Best,
Vino

Márton Balassi mailto:balassi.mar...@gmail.com>> 
于2019年12月4日周三 下午5:15写道:
Wearing my Cloudera hat I can tell you that we have done this exercise for our 
distros of the  3.0 and 3.1 Hadoop versions. We have not contributed these back 
just yet, but we are open to do so. If the community is interested we can 
contribute those changes back to flink-shaded and suggest the necessay changes 
to flink too. The task was not overly complex, but it certainly involved a bit 
of dependency hell. :-)

Right now we are focused on internal timelines, but we could invest into 
contributing this back in the end of January timeframe if the community deems 
this a worthwhile effort.

Best,
Marton

On Wed, Dec 4, 2019 at 10:00 AM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
There's no JIRA and no one actively working on it. I'm not aware of any 
investigations on the matter; hence the first step would be to just try it out.

A flink-shaded artifact isn't a hard requirement; Flink will work with any 2.X 
hadoop distribution (provided that there aren't any dependency clashes).

On 03/12/2019 18:22, Foster, Craig wrote:
Hi:
I don’t see a JIRA for Hadoop 3 support. I see a comment on a JIRA here from a 
year ago that no one is looking into Hadoop 3 support [1]. Is there a document 
or JIRA that now exists which would point to what needs to be done to support 
Hadoop 3? Right now builds with Hadoop 3 don’t work obviously because there’s 
no flink-shaded-hadoop-3 artifacts.

Thanks!
Craig

[1] https://issues.apache.org/jira/browse/FLINK-11086






L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: java.util.concurrent.ExecutionException

2020-03-03 Thread kant kodali
Hi Gary,

This has to do with my Kafka. After restarting Kafka it seems to work fine!

Thanks!

On Tue, Mar 3, 2020 at 8:18 AM kant kodali  wrote:

> The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
> at Test.main(Test.java:71)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
> ... 8 more
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
>
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>
> ... 19 more
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>
> at
> 

Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

2020-03-03 Thread Till Rohrmann
Hi Anuj,

if you use the exact same schema with which the data has been written for
reading and if there is no bug in the parquet Avro support, then it should
indeed not fail. Hence, I suspect that the producer of your data might
produce slightly different Avro records compared to what Parquet is
expecting. But this is just guessing here.

The reason why you don't see the program fail when only printing the
records is that you don't transform the GenericRecords into the Parquet
format which expects a certain format given the schema.

Maybe it could help to figure out which Avro version your writer uses and
then to compare it to Parquet's AvroWriteSupport. Additionally, the used
schema could be helpful as well.

It could also be that you are running into this Parquet issue [1]. In this
case, you could try to solve the problem by bumping the Parquet version.

[1] https://issues.apache.org/jira/browse/PARQUET-1303

Cheers,
Till


On Sat, Feb 29, 2020 at 5:56 PM aj  wrote:

> Hi Till,
>
> Thanks for the reply .
> I have doubt that input has problem because :
>
> 1. if input has some problem than it should not come in the topic itself
> as schema validation fail at producer side only.
> 2.  i am using the same schema that was used to writed the record in topic
> and i am able to parse the record with same schema as when i try to print
> the stream its not giving any error , only problem occurring when writing
> as parquet.
>
> This is the code that i am using to get the schema that i m passing to
> parquetwriter.
>
> public static Schema getSchema(String subjectName) {
> try {
> List versions = registryClient.getAllVersions(subjectName);
> SchemaMetadata schemaMeta = 
> registryClient.getSchemaMetadata(subjectName, versions.get(versions.size() - 
> 1));
> Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
> return schema;
> } catch (Exception e) {
> e.printStackTrace();
> return null;
> }
> }
>
>
> How input can pass through and inserted in topic if it has some issue.
> Even if its occusring how to find those record and skip that so that
> because of one record my whole processing should not fail.
>
> Thanks,
> Anuj
>
>
>
>
>
> On Sat, Feb 29, 2020 at 9:12 PM Till Rohrmann 
> wrote:
>
>> Hi Anuj,
>>
>> it looks to me that your input GenericRecords don't conform with your
>> output schema schemaSubject. At least, the stack trace says that your
>> output schema expects some String field but the field was actually some
>> ArrayList. Consequently, I would suggest to verify that your input data has
>> the right format and if not to filter those records out which are
>> non-conformant.
>>
>> Cheers,
>> Till
>>
>> On Sat, Feb 29, 2020 at 2:13 PM aj  wrote:
>>
>>> Hi All,
>>>
>>> i have Written a consumer that read from kafka topic and write the data
>>> in parquet format using StreamSink . But i am getting following error. Its
>>> runs for some hours than start failing with this excpetions. I tried to
>>> restart it but failing with same exceptions.After i restart with latest
>>> offset it started working fine for soem hours and than again fail. I am not
>>> able to find root cause for this issue.
>>>
>>> java.lang.Exception: 
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>  Could not forward element to next operator
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)Caused by: 
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>  Could not forward element to next operator
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>> at 
>>> 

Re: java.util.concurrent.ExecutionException

2020-03-03 Thread kant kodali
The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)

at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at Test.main(Test.java:71)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 8 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f57b682f5867a8bf6ff6e1ddce93a1ab)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)

at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)

at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)

at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)

at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.

at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at

Re: java.util.concurrent.ExecutionException

2020-03-03 Thread Gary Yao
Hi,

Can you post the complete stacktrace?

Best,
Gary

On Tue, Mar 3, 2020 at 1:08 PM kant kodali  wrote:

> Hi All,
>
> I am just trying to read edges which has the following format in Kafka
>
> 1,2
> 1,3
> 1,5
>
> using the Table API and then converting to DataStream of Edge Objects and
> printing them. However I am getting
> java.util.concurrent.ExecutionException but not sure why?
>
> Here is the sample code
>
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.graph.Edge;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.*;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.types.NullValue;
> import org.apache.flink.types.Row;
>
> import java.util.UUID;
>
> public class Test {
>
> public static void main(String... args) throws Exception {
>
> EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
>
> StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(env, bsSettings);
>
> bsTableEnv.connect(
> new Kafka()
> .property("bootstrap.servers", "localhost:9092")
> .property("zookeeper.connect", "localhost:2181")
> .property("group.id", UUID.randomUUID().toString())
> .startFromEarliest()
> .version("universal")
> .topic("edges")
> )
> .withFormat(new Csv().fieldDelimiter(','))
> .withSchema(
> new Schema()
> .field("source", DataTypes.BIGINT())
> .field("target", DataTypes.BIGINT())
> )
> .createTemporaryTable("kafka_source");
>
> Table kafkaSourceTable = bsTableEnv.sqlQuery("select * from 
> kafka_source");
>
> TypeInformation> edgeTypeInformation = 
> TypeInformation.of(new TypeHint>() {
> @Override
> public TypeInformation> getTypeInfo() {
> return super.getTypeInfo();
> }
> });
>
> DataStream> edges = 
> bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
> .map(row -> new Edge<>((Long) row.getField(0), (Long) 
> row.getField(1), NullValue.getInstance()))
> .returns(edgeTypeInformation);
>
> edges.print();
>
> bsTableEnv.execute("sample job");
> }
> }
>
>
>
>


Re: Providing hdfs name node IP for streaming file sink

2020-03-03 Thread Vishwas Siravara
Thanks Yang. Going with setting the HADOOP_CONF_DIR in the flink
application. It integrates neatly with flink.

Best,
Nick.

On Mon, Mar 2, 2020 at 7:42 PM Yang Wang  wrote:

> It may work. However, you need to set your own retry policy(similar as
> `ConfiguredFailoverProxyProvider` in hadoop).
> Also if you directly use namenode address and do not load HDFS
> configuration, some HDFS client configuration (e.g.
> dfs.client.*) will not take effect.
>
>
> Best,
> Yang
>
> Nick Bendtner  于2020年3月2日周一 下午11:58写道:
>
>> Thanks a lot Yang. What are your thoughts on catching the exception when
>> a name node is down and retrying with the secondary name node ?
>>
>> Best,
>> Nick.
>>
>> On Sun, Mar 1, 2020 at 9:05 PM Yang Wang  wrote:
>>
>>> Hi Nick,
>>>
>>> Certainly you could directly use "namenode:port" as the schema of you
>>> HDFS path.
>>> Then the hadoop configs(e.g. core-site.xml, hdfs-site.xml) will not be
>>> necessary.
>>> However, that also means you could benefit from the HDFS
>>> high-availability[1].
>>>
>>> If your HDFS cluster is HA configured, i strongly suggest you to set the
>>> "HADOOP_CONF_DIR"
>>> for your Flink application. Both the client and cluster(JM/TM) side need
>>> to be set. Then
>>> your HDFS path could be specified like this "hdfs://myhdfs/flink/test".
>>> Given that "myhdfs"
>>> is the name service configured in hdfs-site.xml.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>>
>>> [1].
>>> http://hadoop.apache.org/docs/r2.8.5/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
>>>
>>> Nick Bendtner  于2020年2月29日周六 上午6:00写道:
>>>
 To add to this question, do I need to setup env.hadoop.conf.dir to
 point to the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/
 for the jvm ? Or is it possible to write to hdfs without any external
 hadoop config like core-site.xml, hdfs-site.xml ?

 Best,
 Nick.



 On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner 
 wrote:

> Hi guys,
> I am trying to write to hdfs from streaming file sink. Where should I
> provide the IP address of the name node ? Can I provide it as a part of 
> the
> flink-config.yaml file or should I provide it like this :
>
> final StreamingFileSink sink = StreamingFileSink
>   .forBulkFormat(hdfs://namenode:8020/flink/test, 
> ParquetAvroWriters.forGenericRecord(schema))
>
>   .build();
>
>
> Best,
> Nick
>
>
>


[Survey] Default size for the new JVM Metaspace limit in 1.10

2020-03-03 Thread Andrey Zagrebin
Hi All,

Recently, FLIP-49 [1] introduced the new JVM Metaspace limit in the 1.10
release [2]. Flink scripts, which start the task manager JVM process, set
this limit by adding the corresponding JVM argument. This has been done to
properly plan resources. especially to derive container size for
Yarn/Mesos/Kubernetes. Also, it should surface potential class loading
leaks. There is an option to change it:
'taskmanager.memory.jvm-metaspace.size' [3]. Its current default value is
96Mb.

This change led to 'OutOfMemoryError: Metaspace' in certain cases after
upgrading to 1.10 version. In some cases, a class loading leak has been
detected [4] and has to be investigated on its own. In other cases, just
increasing the option value helped because the default value was not
enough, presumably, due to the job specifics. In general, the required
Metaspace size depends on the job and there is no default value to cover
all cases. There is an issue to improve docs for this concern [5].

This survey is to come up with the most reasonable default value for this
option. If you have encountered this issue and increasing the Metaspace
size helped (there is no class loading leak), please, report any specifics
of your job, if you think it is relevant for this concern, and the option
value that resolved it. There is also a dedicated Jira issue [6] for
reporting.

Thanks,
Andrey

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#jvm-parameters
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-metaspace-size
[4] https://issues.apache.org/jira/browse/FLINK-16142
[5] https://issues.apache.org/jira/browse/FLINK-16278
[6] https://jira.apache.org/jira/browse/FLINK-16406


Re: zookeeper.connect is not needed but Flink requires it

2020-03-03 Thread Jark Wu
Hi Kant,

You are right. It is not needed since Kafka 0.9+. We already have an issue
to make it optional.
https://issues.apache.org/jira/browse/FLINK-16125

Best,
Jark

On Tue, 3 Mar 2020 at 20:17, kant kodali  wrote:

> Hi All,
>
> The zookeeper.connect is not needed for KafkaConsumer or KafkaAdminClient
> however Flink requires it. You can also see in the Flink TaskManager logs
> the KafkaConsumer is not recognizing this property anyways.
>
> bsTableEnv.connect(
> new Kafka()
> .property("bootstrap.servers", "localhost:9092")
> .property("zookeeper.connect", "localhost:2181")
>
>
> 2020-03-03 03:48:54,644 WARN
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.
> ConsumerConfig - The configuration 'zookeeper.connect' was supplied but
> isn't a known config.
>


Re: Operator latency metric not working in 1.9.1

2020-03-03 Thread orips
Thanks, that makes sense!

In addition, I've just found the reason for this in the code:

This is 1.5 (default value is 2000L):
https://github.com/apache/flink/blob/release-1.5/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java#L109

This is 1.9 (default value is 0L)
https://github.com/apache/flink/blob/272fccf44fd92fc9a2c7fa7465b2fcc430842427/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java#L115




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Alink and Flink ML

2020-03-03 Thread Gary Yao
Hi Flavio,

I am looping in Becket (cc'ed) who might be able to answer your question.

Best,
Gary

On Tue, Mar 3, 2020 at 12:19 PM Flavio Pompermaier 
wrote:

> Hi to all,
> since Alink has been open sourced, is there any good reason to keep both
> Flink ML and Alink?
> From what I understood Alink already contains the best ML implementation
> available for Flink..am I wrong?
> Maybe it could make sense to replace the current Flink ML with that of
> Alink..or is that impossible?
>
> Cheers,
> Flavio
>


Re: Unable to recover from savepoint and checkpoint

2020-03-03 Thread Gary Yao
Hi Puneet,

Can you describe how you validated that the state is not restored properly?
Specifically, how did you introduce faults to the cluster?

Best,
Gary

On Tue, Mar 3, 2020 at 11:08 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Sorry for the missed information
>
> On recovery the value is coming as false instead of true, state.backend
> has been configured in flink-conf.yaml  along the
> the path for checkpointing and savepoint.
>
> On Tue, Mar 3, 2020 at 3:34 PM Puneet Kinra <
> puneet.ki...@customercentria.com> wrote:
>
>> Hi
>>
>> Stuck with the simple program regarding the checkpointing Flink version I
>> am using 1.10.0
>>
>> *Here I have created DummySource for testing*
>>
>> *DummySource*
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>
>> public class BeaconSource implements SourceFunction>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> private Boolean isRunning=true;
>>
>>
>> public BeaconSource() {
>> super();
>> // TODO Auto-generated constructor stub
>> }
>>
>>
>>
>> public void cancel() {
>> // TODO Auto-generated method stub
>>
>> this.isRunning=false;
>>
>> }
>>
>> public void run(SourceContext> arg0) throws Exception
>> {
>> // TODO Auto-generated method stub
>> while(isRunning) {
>> Thread.sleep(3L);
>> arg0.collect(new Tuple2(10L,"AMQSource"));
>> }
>> }
>>
>> }
>>
>>
>>
>> ---
>> *KeyedProcessFunction (to register the timer and update the status to
>> true so that only one-time trigger should)*
>>
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.common.functions.IterationRuntimeContext;
>> import org.apache.flink.api.common.functions.RuntimeContext;
>> import org.apache.flink.api.common.state.ListState;
>> import org.apache.flink.api.common.state.ListStateDescriptor;
>> import org.apache.flink.api.common.state.ValueState;
>> import org.apache.flink.api.common.state.ValueStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.tuple.Tuple;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.tuple.Tuple3;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.runtime.state.FunctionInitializationContext;
>> import org.apache.flink.runtime.state.FunctionSnapshotContext;
>> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>> import org.apache.flink.streaming.api.functions.ProcessFunction;
>> import org.apache.flink.util.Collector;
>>
>> import com.google.gson.JsonObject;
>> import com.google.gson.JsonParser;
>>
>> import scala.collection.mutable.LinkedHashMap;
>>
>>
>>
>> import java.util.HashMap;
>> import java.util.Map;
>> import java.util.Map.Entry;
>> import java.util.Set;
>>
>> public class TimeProcessTrigger extends
>> KeyedProcessFunction,String>{
>>
>> /**
>> *
>> */
>> private static final long serialVersionUID = 1L;
>> /**
>> *
>> */
>>
>> private transient ValueState contacthistory;
>> private static final  Long  ONE_MINUTE=6L;
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> @Override
>> public void onTimer(long timestamp, KeyedProcessFunction> Tuple2, String>.OnTimerContext ctx,
>> Collector out) throws Exception {
>> // TODO Auto-generated method stub
>> super.onTimer(timestamp, ctx, out);
>> System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> // TODO Auto-generated method stub
>> super.open(parameters);
>>
>>
>> ValueStateDescriptor descriptor = new
>> ValueStateDescriptor(
>> "contact-history", // the state name
>> Boolean.class); // type information
>>
>> this.contacthistory=getRuntimeContext().getState(descriptor);
>> }
>>
>>
>>
>>
>>
>>
>> @Override
>> public void processElement(Tuple2 input,
>> KeyedProcessFunction, String>.Context ctx,
>> Collector collect)
>> throws Exception {
>> // TODO Auto-generated method stub
>>
>>
>> System.out.println(this.contacthistory.value());
>> Boolean value = this.contacthistory.value();
>> if(value==null) {
>> Long currentTime = ctx.timerService().currentProcessingTime();
>> Long regTimer=currentTime+ONE_MINUTE;
>> System.out.println("Updating the flag and registering the timer
>> @:"+regTimer);
>> this.contacthistory.update(true);
>> ctx.timerService().registerProcessingTimeTimer(regTimer);
>>
>> }else {
>> System.out.println("Timer has already register for this key");
>> }
>> }
>>
>> }
>>
>>
>> -
>> *Main App*
>>
>> package com.nudge.stateful;
>>
>> import org.apache.flink.api.java.functions.KeySelector;
>> import 

????Java????????????????

2020-03-03 Thread claylin
hi all??Java??


https://s2.ax1x.com/2020/03/03/34yyvT.png



https://s2.ax1x.com/2020/03/03/34y5P1.png
??

Re: Operator latency metric not working in 1.9.1

2020-03-03 Thread Gary Yao
Hi,

There is a release note for Flink 1.7 that could be relevant for you [1]

Granularity of latency metrics
The default granularity for latency metrics has been modified. To
restore the previous behavior users have to explicitly set the granularity
to subtask.

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.7.html#granularity-of-latency-metrics

On Tue, Mar 3, 2020 at 10:14 AM orips  wrote:

> Thanks for the response.
>
> In 1.5 the docs also state that it should be enabled [1], however, it
> always
> worked without setting latencyTrackingInterval
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#latency-tracking
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


zookeeper.connect is not needed but Flink requires it

2020-03-03 Thread kant kodali
Hi All,

The zookeeper.connect is not needed for KafkaConsumer or KafkaAdminClient
however Flink requires it. You can also see in the Flink TaskManager logs
the KafkaConsumer is not recognizing this property anyways.

bsTableEnv.connect(
new Kafka()
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")


2020-03-03 03:48:54,644 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.
ConsumerConfig - The configuration 'zookeeper.connect' was supplied but isn't
a known config.


Alink and Flink ML

2020-03-03 Thread Flavio Pompermaier
Hi to all,
since Alink has been open sourced, is there any good reason to keep both
Flink ML and Alink?
>From what I understood Alink already contains the best ML implementation
available for Flink..am I wrong?
Maybe it could make sense to replace the current Flink ML with that of
Alink..or is that impossible?

Cheers,
Flavio


Re: 使用Flink1.10.0读取hive时source并行度问题

2020-03-03 Thread JingsongLee
Hi jun,

很好的建议~ 这是一个优化点~ 可以建一个JIRA

Best,
Jingsong Lee


--
From:Jun Zhang <825875...@qq.com>
Send Time:2020年3月3日(星期二) 18:45
To:user-zh@flink.apache.org ; JingsongLee 

Cc:user-zh@flink.apache.org ; like 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


 
hi,jinsong:
 我想说一个问题, 我开始了自动推断,比如我设置推断的最大并行度是10,
我有一个类似的sql   select * from  mytable limit 1;
hive表mytable有超过10个文件,如果启动了10个并行度是不是有点浪费呢。
在2020年03月2日 16:38,JingsongLee 写道:
建议使用Batch模式来读取Hive table。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 16:35
To:lzljs3620...@aliyun.com 
Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


我使用的是 StreamTableEnvironment,确实有碰到这个问题呢。
在2020年3月2日 16:16,JingsongLee 写道:   
 自动推断可能面临资源不足无法启动的问题

理论上不应该呀?Batch作业是可以部分运行的。

Best,
Jingsong Lee

--
From:like 
Send Time:2020年3月2日(星期一) 15:35
To:user-zh@flink.apache.org ; lzljs3620...@aliyun.com 

Subject:回复: 使用Flink1.10.0读取hive时source并行度问题


非常感谢!我尝试关闭自动推断后,已经可以控制source并行度了,自动推断可能面临资源不足无法启动的问题。 


在2020年3月2日 15:18,JingsongLee 写道:   Hi, 

1.10中,Hive source是自动推断并发的,你可以使用以下参数配置到flink-conf.yaml里面来控制并发:
- table.exec.hive.infer-source-parallelism=true (默认使用自动推断)
- table.exec.hive.infer-source-parallelism.max=1000 (自动推断的最大并发)

Sink的并发默认和上游的并发相同,如果有Shuffle,使用配置的统一并发。

Best,
Jingsong Lee


--
From:like 
Send Time:2020年3月2日(星期一) 14:58
To:user-zh@flink.apache.org 
Subject:使用Flink1.10.0读取hive时source并行度问题

hi,大家好

 我使用flink1.10.0读取hive表,启动时设置了全局的并行度,但是在读取的时候,发现sink并行度是受约束的,
而source的并行度不受此约束,会根据source的大小改变,大的时候并行度大到1000,请问一下怎么处理这个并行度呢?  

Re: Hive Source With Kerberos认证问题

2020-03-03 Thread Rui Li
datanucleus是在HMS端使用的,如果没有datanucleus会报错的话说明你的代码在尝试创建embedded
metastore。这是预期的行为么?我理解你们应该是有一个远端的HMS,然后希望HiveCatalog去连接这个HMS吧?

On Tue, Mar 3, 2020 at 4:00 PM 叶贤勋  wrote:

> hive conf应该是对的,前面UserGroupInfomation登录时都是成功的。
> datanucleus的依赖不加的话,会报claas not found等异常。
> 1、java.lang.ClassNotFoundException:
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory
> 2、Caused by: org.datanucleus.exceptions.NucleusUserException: There is no
> available StoreManager of type "rdbms". Please make sure you have specified
> "datanucleus.storeManagerType" correctly and that all relevant plugins are
> in the CLASSPATH
>
>
> 叶贤勋
> yxx_c...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> 在2020年03月2日 11:50,Rui Li  写道:
>
> 从你贴的log来看似乎是创建了embedded metastore。可以检查一下HiveCatalog是不是读到了不正确的hive
> conf?另外你贴的maven的这些依赖都打到你flink作业的jar里了么?像datanucleus的依赖应该是不需要的。
>
> On Sat, Feb 29, 2020 at 10:42 PM 叶贤勋  wrote:
>
> Hi 李锐,感谢你的回复。
> 前面的问题通过设置yarn.resourcemanager.principal,已经解决。
> 但是现在出现另外一个问题,请帮忙看看。
>
>
> 背景:flink任务还是source带有kerberos的hive,相同代码在本地进行测试是能通过kerberos认证,并且能够查询和插入数据到hive。但是任务提交到集群就报kerberos认证失败的错误。
> Flink:1.9.1, flink-1.9.1/lib/有flink-dist_2.11-1.9.1.jar,
> flink-shaded-hadoop-2-uber-2.7.5-7.0.jar,log4j-1.2.17.jar,
> slf4j-log4j12-1.7.15.jar
> Hive:2.1.1
> flink任务主要依赖的jar:
> [INFO] +- org.apache.flink:flink-table-api-java:jar:flink-1.9.1:compile
> [INFO] |  +- org.apache.flink:flink-table-common:jar:flink-1.9.1:compile
> [INFO] |  |  \- org.apache.flink:flink-core:jar:flink-1.9.1:compile
> [INFO] |  | +-
> org.apache.flink:flink-annotations:jar:flink-1.9.1:compile
> [INFO] |  | +-
> org.apache.flink:flink-metrics-core:jar:flink-1.9.1:compile
> [INFO] |  | \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
> [INFO] |  |+- com.esotericsoftware.minlog:minlog:jar:1.2:compile
> [INFO] |  |\- org.objenesis:objenesis:jar:2.1:compile
> [INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [INFO] |  \- org.apache.flink:force-shading:jar:1.9.1:compile
> [INFO] +-
> org.apache.flink:flink-table-planner-blink_2.11:jar:flink-1.9.1:compile
> [INFO] |  +-
> org.apache.flink:flink-table-api-scala_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.12:compile
> [INFO] |  |  \- org.scala-lang:scala-compiler:jar:2.11.12:compile
> [INFO] |  +-
> org.apache.flink:flink-table-api-java-bridge_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  +- org.apache.flink:flink-java:jar:flink-1.9.1:compile
> [INFO] |  |  \-
> org.apache.flink:flink-streaming-java_2.11:jar:1.9.1:compile
> [INFO] |  +-
> org.apache.flink:flink-table-api-scala-bridge_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  \- org.apache.flink:flink-scala_2.11:jar:flink-1.9.1:compile
> [INFO] |  +-
> org.apache.flink:flink-table-runtime-blink_2.11:jar:flink-1.9.1:compile
> [INFO] |  |  +- org.codehaus.janino:janino:jar:3.0.9:compile
> [INFO] |  |  \- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
> [INFO] |  \- org.reflections:reflections:jar:0.9.10:compile
> [INFO] +- org.apache.flink:flink-table-planner_2.11:jar:flink-1.9.1:compile
> [INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
> [INFO] +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:compile
> [INFO] |  +- org.scala-lang:scala-library:jar:2.11.8:compile
> [INFO] |  +- com.typesafe:config:jar:1.3.3:compile
> [INFO] |  \-
> org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
> [INFO] +- org.apache.flink:flink-sql-client_2.11:jar:1.9.1:compile
> [INFO] |  +- org.apache.flink:flink-clients_2.11:jar:1.9.1:compile
> [INFO] |  |  \- org.apache.flink:flink-optimizer_2.11:jar:1.9.1:compile
> [INFO] |  +- org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1:compile
> [INFO] |  +- log4j:log4j:jar:1.2.17:compile
> [INFO] |  \- org.apache.flink:flink-shaded-jackson:jar:2.9.8-7.0:compile
> [INFO] +- org.apache.flink:flink-json:jar:1.9.1:compile
> [INFO] +- org.apache.flink:flink-csv:jar:1.9.1:compile
> [INFO] +- org.apache.flink:flink-hbase_2.11:jar:1.9.1:compile
> [INFO] +- org.apache.hbase:hbase-server:jar:2.2.1:compile
> [INFO] |  +-
> org.apache.hbase.thirdparty:hbase-shaded-protobuf:jar:2.2.1:compile
> [INFO] |  +-
> org.apache.hbase.thirdparty:hbase-shaded-netty:jar:2.2.1:compile
> [INFO] |  +-
> org.apache.hbase.thirdparty:hbase-shaded-miscellaneous:jar:2.2.1:compile
> [INFO] |  |  \-
> com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
> [INFO] |  +- org.apache.hbase:hbase-common:jar:2.2.1:compile
> [INFO] |  |  \-
> com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
> [INFO] |  +- org.apache.hbase:hbase-http:jar:2.2.1:compile
> [INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:9.3.27.v20190418:compile
> [INFO] |  |  +-
> 

?????? ????Flink1.10.0????hive??source??????????

2020-03-03 Thread Jun Zhang
hi??jinsong??

 
  ??10??
??sql  select * from mytable limit 1;
hive??mytable??10??10??
 ??2020??03??2?? 16:38??JingsongLee

Unable to recover from savepoint and checkpoint

2020-03-03 Thread Puneet Kinra
Hi

Stuck with the simple program regarding the checkpointing Flink version I
am using 1.10.0

*Here I have created DummySource for testing*

*DummySource*
package com.nudge.stateful;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class BeaconSource implements SourceFunction>{

/**
*
*/
private static final long serialVersionUID = 1L;
private Boolean isRunning=true;


public BeaconSource() {
super();
// TODO Auto-generated constructor stub
}



public void cancel() {
// TODO Auto-generated method stub

this.isRunning=false;

}

public void run(SourceContext> arg0) throws Exception {
// TODO Auto-generated method stub
while(isRunning) {
Thread.sleep(3L);
arg0.collect(new Tuple2(10L,"AMQSource"));
}
}

}


---
*KeyedProcessFunction (to register the timer and update the status to true
so that only one-time trigger should)*


package com.nudge.stateful;

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import scala.collection.mutable.LinkedHashMap;



import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class TimeProcessTrigger extends
KeyedProcessFunction,String>{

/**
*
*/
private static final long serialVersionUID = 1L;
/**
*
*/

private transient ValueState contacthistory;
private static final  Long  ONE_MINUTE=6L;










@Override
public void onTimer(long timestamp, KeyedProcessFunction, String>.OnTimerContext ctx,
Collector out) throws Exception {
// TODO Auto-generated method stub
super.onTimer(timestamp, ctx, out);
System.out.println("Timer has fired for the key"+ctx.getCurrentKey());
}






@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);


ValueStateDescriptor descriptor = new
ValueStateDescriptor(
"contact-history", // the state name
Boolean.class); // type information

this.contacthistory=getRuntimeContext().getState(descriptor);
}






@Override
public void processElement(Tuple2 input,
KeyedProcessFunction, String>.Context ctx,
Collector collect)
throws Exception {
// TODO Auto-generated method stub


System.out.println(this.contacthistory.value());
Boolean value = this.contacthistory.value();
if(value==null) {
Long currentTime = ctx.timerService().currentProcessingTime();
Long regTimer=currentTime+ONE_MINUTE;
System.out.println("Updating the flag and registering the timer
@:"+regTimer);
this.contacthistory.update(true);
ctx.timerService().registerProcessingTimeTimer(regTimer);

}else {
System.out.println("Timer has already register for this key");
}
}

}


-
*Main App*

package com.nudge.stateful;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.indiabulls.nudge.stateful.*;

public class App
{
public static void main( String[] args ) throws Exception
{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3);
env.setParallelism(1);
// // advanced 

回复: Hive Source With Kerberos认证问题

2020-03-03 Thread 叶贤勋
这是我和flink社区沟通的记录,你可以看下。


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年03月3日 16:00,叶贤勋 写道:
hive conf应该是对的,前面UserGroupInfomation登录时都是成功的。
datanucleus的依赖不加的话,会报claas not found等异常。
1、java.lang.ClassNotFoundException: 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
2、Caused by: org.datanucleus.exceptions.NucleusUserException: There is no 
available StoreManager of type "rdbms". Please make sure you have specified 
"datanucleus.storeManagerType" correctly and that all relevant plugins are in 
the CLASSPATH



| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年03月2日 11:50,Rui Li 写道:
从你贴的log来看似乎是创建了embedded metastore。可以检查一下HiveCatalog是不是读到了不正确的hive
conf?另外你贴的maven的这些依赖都打到你flink作业的jar里了么?像datanucleus的依赖应该是不需要的。

On Sat, Feb 29, 2020 at 10:42 PM 叶贤勋  wrote:

Hi 李锐,感谢你的回复。
前面的问题通过设置yarn.resourcemanager.principal,已经解决。
但是现在出现另外一个问题,请帮忙看看。

背景:flink任务还是source带有kerberos的hive,相同代码在本地进行测试是能通过kerberos认证,并且能够查询和插入数据到hive。但是任务提交到集群就报kerberos认证失败的错误。
Flink:1.9.1, flink-1.9.1/lib/有flink-dist_2.11-1.9.1.jar,
flink-shaded-hadoop-2-uber-2.7.5-7.0.jar,log4j-1.2.17.jar,
slf4j-log4j12-1.7.15.jar
Hive:2.1.1
flink任务主要依赖的jar:
[INFO] +- org.apache.flink:flink-table-api-java:jar:flink-1.9.1:compile
[INFO] |  +- org.apache.flink:flink-table-common:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-core:jar:flink-1.9.1:compile
[INFO] |  | +-
org.apache.flink:flink-annotations:jar:flink-1.9.1:compile
[INFO] |  | +-
org.apache.flink:flink-metrics-core:jar:flink-1.9.1:compile
[INFO] |  | \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO] |  |+- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |\- org.objenesis:objenesis:jar:2.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.9.1:compile
[INFO] +-
org.apache.flink:flink-table-planner-blink_2.11:jar:flink-1.9.1:compile
[INFO] |  +-
org.apache.flink:flink-table-api-scala_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.12:compile
[INFO] |  |  \- org.scala-lang:scala-compiler:jar:2.11.12:compile
[INFO] |  +-
org.apache.flink:flink-table-api-java-bridge_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.apache.flink:flink-java:jar:flink-1.9.1:compile
[INFO] |  |  \-
org.apache.flink:flink-streaming-java_2.11:jar:1.9.1:compile
[INFO] |  +-
org.apache.flink:flink-table-api-scala-bridge_2.11:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-scala_2.11:jar:flink-1.9.1:compile
[INFO] |  +-
org.apache.flink:flink-table-runtime-blink_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.codehaus.janino:janino:jar:3.0.9:compile
[INFO] |  |  \- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
[INFO] |  \- org.reflections:reflections:jar:0.9.10:compile
[INFO] +- org.apache.flink:flink-table-planner_2.11:jar:flink-1.9.1:compile
[INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
[INFO] +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.8:compile
[INFO] |  +- com.typesafe:config:jar:1.3.3:compile
[INFO] |  \-
org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
[INFO] +- org.apache.flink:flink-sql-client_2.11:jar:1.9.1:compile
[INFO] |  +- org.apache.flink:flink-clients_2.11:jar:1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-optimizer_2.11:jar:1.9.1:compile
[INFO] |  +- org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  \- org.apache.flink:flink-shaded-jackson:jar:2.9.8-7.0:compile
[INFO] +- org.apache.flink:flink-json:jar:1.9.1:compile
[INFO] +- org.apache.flink:flink-csv:jar:1.9.1:compile
[INFO] +- org.apache.flink:flink-hbase_2.11:jar:1.9.1:compile
[INFO] +- org.apache.hbase:hbase-server:jar:2.2.1:compile
[INFO] |  +-
org.apache.hbase.thirdparty:hbase-shaded-protobuf:jar:2.2.1:compile
[INFO] |  +-
org.apache.hbase.thirdparty:hbase-shaded-netty:jar:2.2.1:compile
[INFO] |  +-
org.apache.hbase.thirdparty:hbase-shaded-miscellaneous:jar:2.2.1:compile
[INFO] |  |  \-
com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
[INFO] |  +- org.apache.hbase:hbase-common:jar:2.2.1:compile
[INFO] |  |  \-
com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] |  +- org.apache.hbase:hbase-http:jar:2.2.1:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:9.3.27.v20190418:compile
[INFO] |  |  +-
org.eclipse.jetty:jetty-util-ajax:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-http:jar:9.3.27.v20190418:compile
[INFO] |  |  +-
org.eclipse.jetty:jetty-security:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile
[INFO] |  |  |  +-
org.glassfish.jersey.core:jersey-common:jar:2.25.1:compile
[INFO] |  |  |  |  +-
org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.25.1:compile
[INFO] |  |  |  |  \-
org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile
[INFO] |  |  |  +-

Re: Operator latency metric not working in 1.9.1

2020-03-03 Thread orips
Thanks for the response.

In 1.5 the docs also state that it should be enabled [1], however, it always
worked without setting latencyTrackingInterval

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#latency-tracking



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


回复: CliFrontend 未优先加载用户jar包中的class

2020-03-03 Thread aven . wu
感谢回答
后来我查了Flink run脚本的classpath设置,我修改了脚本将我的jar包指定在flink classpath的最前面得以解决问题

Best
Aven

发件人: tison
发送时间: 2020年3月3日 14:16
收件人: user-zh
主题: Re: CliFrontend 未优先加载用户jar包中的class

https://github.com/apache/flink/commit/0f30c263eebd2fc3ecbeae69a4ce9477e1d5d774

Best,
tison.


tison  于2020年3月3日周二 下午2:13写道:

> 1.9.2 和 1.10 上已经修复此问题,修改可参考
>
> https://issues.apache.org/jira/browse/FLINK-13749
>
> Best,
> tison.
>
>
> aven.wu  于2020年3月3日周二 下午2:04写道:
>
>> 组件版本 Hadoop 2.7.3,flink 1.9.1 ,elasticsearch6.5。
>> 该问题的起源是因为程序我的用户程序用Jackson,并依赖了Elasticsearch rest client
>> ,在Yarn集群上提交任务的时候出现了如下异常:
>> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
>> at
>> org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:57)
>> 后上网查询后推论有可能是jackson版本问题,于是打印了类加载路径:
>> --main class jackson class load before
>> run--
>> file:/usr/**/hadoop/lib/jackson-databind-2.2.3.jar
>> 果然是从hadoop的classpath下加载了2.2.3版本
>>
>> 之后查看flink run命令入口程序
>> CliFrontend#bulidProgram line 799
>> PackagedProgram#PackagedProgram line 221
>> JobWithJars#BuildUserCodeClassLoad line 142
>> return FlinkUserCodeClassLoaders.parentFirst(urls, parent);
>> 默认使用parentFirst,根据官方文档里面描述的反向类加载,应该是首先从classpath下加载Class,而不是从user
>> jar包中加载类。
>> 请问如何修改此处的类加载顺序,优先从user jar 中加载class
>>
>> Best
>> Aven
>>
>>



Re: Flink EventTime Processing Watermark is always coming as 9223372036854725808

2020-03-03 Thread aj
Thanks, Robert for mentioning this, I will take care of it in future posts.

I am able to figure out the issue. When I disable checkpoint then the
watermark is getting updated and its working. I need to understand 2 things
:

1. Please help to understand what is happening when I enable checkpointing,
and how to make it work with enable checkpointing as I need to write a data
stream with checkpoint enable.

2. Second, so basically I want to collect all the session data and want to
process all the events data at the end of the session (using inactivity for
x minutes).
I know this functionality is available in the session window where I can
create a session window using an inactive period But there enrichment and
processing of events is not recommended. So, how I can use the same
functionality to trigger based on the inactivity period and process all the
events and clear the queue.


Thanks,
Anuj


On Tue, Mar 3, 2020 at 3:40 AM Robert Metzger  wrote:

> side note: this question has been asked on SO as well:
> https://stackoverflow.com/questions/60487571/flink-eventtime-processing-watermark-is-always-coming-as-9223372036854725808
> (I'm mentioning this here so that we are not wasting support resources in
> our community on double-debugging issues)
>
> On Mon, Mar 2, 2020 at 5:36 PM aj  wrote:
>
>> Hi David,
>>
>> Currently, I am testing it with a single source and parallelism 1 only so
>> not able to understand this behavior.
>>
>> On Mon, Mar 2, 2020 at 9:02 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Anuj,
>>>
>>> What parallelism has your source? Do all of your source tasks produce
>>> records? Watermark is always the minimum of timestamps seen from all the
>>> upstream operators. Therefore if some of them do not produce records the
>>> watermark will not progress. You can read more about Watermarks and how
>>> they work here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#watermarks-in-parallel-streams
>>>
>>> Hope that helps
>>>
>>> Best,
>>>
>>> Dawid
>>> On 02/03/2020 16:26, aj wrote:
>>>
>>> I am trying to use process function to some processing on a set of
>>> events. I am using event time and keystream. The issue I am facing is The
>>> watermark value is always coming as 9223372036854725808. I have put print
>>> statement to debug and it shows like this:
>>>
>>> timestamp--1583128014000 extractedTimestamp 1583128014000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp--1583128048000 extractedTimestamp 1583128048000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp--1583128089000 extractedTimestamp 1583128089000
>>> currentwatermark-9223372036854775808
>>>
>>> timestamp and extracted timestamp changing but watermark not getting
>>> updated. So no record is getting in the queue as context.timestamp is never
>>> less than the watermark.
>>>
>>>
>>> DataStream dataStream = 
>>> env.addSource(searchConsumer).name("search_list_keyless");
>>> DataStream dataStreamWithWaterMark =  
>>> dataStream.assignTimestampsAndWatermarks(new SessionAssigner());
>>>
>>>try {
>>> dataStreamWithWaterMark.keyBy((KeySelector>> String>) record -> {
>>> StringBuilder builder = new StringBuilder();
>>> builder.append(record.get("session_id"));
>>> builder.append(record.get("user_id"));
>>> return builder.toString();
>>> }).process(new MatchFunction()).print();
>>> }
>>> catch (Exception e){
>>> e.printStackTrace();
>>> }
>>> env.execute("start session process");
>>>
>>> }
>>>
>>> public static class SessionAssigner implements 
>>> AssignerWithPunctuatedWatermarks  {
>>> @Override
>>> public long extractTimestamp(GenericRecord record, long 
>>> previousElementTimestamp) {
>>> long timestamp = (long) record.get("event_ts");
>>> System.out.println("timestamp--"+ timestamp);
>>> return timestamp;
>>> }
>>>
>>> @Override
>>> public Watermark checkAndGetNextWatermark(GenericRecord record, 
>>> long extractedTimestamp) {
>>> // simply emit a watermark with every event
>>> System.out.println("extractedTimestamp "+extractedTimestamp);
>>> return new Watermark(extractedTimestamp - 3);
>>> }
>>>  }
>>>
>>>@Override
>>> public void processElement(GenericRecord record, Context context, 
>>> Collector collector) throws Exception {
>>>
>>> TimerService timerService = context.timerService();
>>> System.out.println("currentwatermark"+ 
>>> timerService.currentWatermark());
>>> if (context.timestamp() > timerService.currentWatermark()) {
>>>
>>> Tuple2> queueval = 
>>> queueState.value();
>>> PriorityQueue queue = queueval.f1;
>>> long startTime = queueval.f0;
>>> System.out.println("starttime"+ startTime);

回复: Hive Source With Kerberos认证问题

2020-03-03 Thread 叶贤勋
hive conf应该是对的,前面UserGroupInfomation登录时都是成功的。
datanucleus的依赖不加的话,会报claas not found等异常。
1、java.lang.ClassNotFoundException: 
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
2、Caused by: org.datanucleus.exceptions.NucleusUserException: There is no 
available StoreManager of type "rdbms". Please make sure you have specified 
"datanucleus.storeManagerType" correctly and that all relevant plugins are in 
the CLASSPATH



| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年03月2日 11:50,Rui Li 写道:
从你贴的log来看似乎是创建了embedded metastore。可以检查一下HiveCatalog是不是读到了不正确的hive
conf?另外你贴的maven的这些依赖都打到你flink作业的jar里了么?像datanucleus的依赖应该是不需要的。

On Sat, Feb 29, 2020 at 10:42 PM 叶贤勋  wrote:

Hi 李锐,感谢你的回复。
前面的问题通过设置yarn.resourcemanager.principal,已经解决。
但是现在出现另外一个问题,请帮忙看看。

背景:flink任务还是source带有kerberos的hive,相同代码在本地进行测试是能通过kerberos认证,并且能够查询和插入数据到hive。但是任务提交到集群就报kerberos认证失败的错误。
Flink:1.9.1, flink-1.9.1/lib/有flink-dist_2.11-1.9.1.jar,
flink-shaded-hadoop-2-uber-2.7.5-7.0.jar,log4j-1.2.17.jar,
slf4j-log4j12-1.7.15.jar
Hive:2.1.1
flink任务主要依赖的jar:
[INFO] +- org.apache.flink:flink-table-api-java:jar:flink-1.9.1:compile
[INFO] |  +- org.apache.flink:flink-table-common:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-core:jar:flink-1.9.1:compile
[INFO] |  | +-
org.apache.flink:flink-annotations:jar:flink-1.9.1:compile
[INFO] |  | +-
org.apache.flink:flink-metrics-core:jar:flink-1.9.1:compile
[INFO] |  | \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO] |  |+- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |\- org.objenesis:objenesis:jar:2.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.9.1:compile
[INFO] +-
org.apache.flink:flink-table-planner-blink_2.11:jar:flink-1.9.1:compile
[INFO] |  +-
org.apache.flink:flink-table-api-scala_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.12:compile
[INFO] |  |  \- org.scala-lang:scala-compiler:jar:2.11.12:compile
[INFO] |  +-
org.apache.flink:flink-table-api-java-bridge_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.apache.flink:flink-java:jar:flink-1.9.1:compile
[INFO] |  |  \-
org.apache.flink:flink-streaming-java_2.11:jar:1.9.1:compile
[INFO] |  +-
org.apache.flink:flink-table-api-scala-bridge_2.11:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-scala_2.11:jar:flink-1.9.1:compile
[INFO] |  +-
org.apache.flink:flink-table-runtime-blink_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.codehaus.janino:janino:jar:3.0.9:compile
[INFO] |  |  \- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
[INFO] |  \- org.reflections:reflections:jar:0.9.10:compile
[INFO] +- org.apache.flink:flink-table-planner_2.11:jar:flink-1.9.1:compile
[INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
[INFO] +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.8:compile
[INFO] |  +- com.typesafe:config:jar:1.3.3:compile
[INFO] |  \-
org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
[INFO] +- org.apache.flink:flink-sql-client_2.11:jar:1.9.1:compile
[INFO] |  +- org.apache.flink:flink-clients_2.11:jar:1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-optimizer_2.11:jar:1.9.1:compile
[INFO] |  +- org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  \- org.apache.flink:flink-shaded-jackson:jar:2.9.8-7.0:compile
[INFO] +- org.apache.flink:flink-json:jar:1.9.1:compile
[INFO] +- org.apache.flink:flink-csv:jar:1.9.1:compile
[INFO] +- org.apache.flink:flink-hbase_2.11:jar:1.9.1:compile
[INFO] +- org.apache.hbase:hbase-server:jar:2.2.1:compile
[INFO] |  +-
org.apache.hbase.thirdparty:hbase-shaded-protobuf:jar:2.2.1:compile
[INFO] |  +-
org.apache.hbase.thirdparty:hbase-shaded-netty:jar:2.2.1:compile
[INFO] |  +-
org.apache.hbase.thirdparty:hbase-shaded-miscellaneous:jar:2.2.1:compile
[INFO] |  |  \-
com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
[INFO] |  +- org.apache.hbase:hbase-common:jar:2.2.1:compile
[INFO] |  |  \-
com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] |  +- org.apache.hbase:hbase-http:jar:2.2.1:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:9.3.27.v20190418:compile
[INFO] |  |  +-
org.eclipse.jetty:jetty-util-ajax:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-http:jar:9.3.27.v20190418:compile
[INFO] |  |  +-
org.eclipse.jetty:jetty-security:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile
[INFO] |  |  |  +-
org.glassfish.jersey.core:jersey-common:jar:2.25.1:compile
[INFO] |  |  |  |  +-
org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.25.1:compile
[INFO] |  |  |  |  \-
org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile
[INFO] |  |  |  +-
org.glassfish.jersey.core:jersey-client:jar:2.25.1:compile
[INFO] |  |  |  +-