Re:flink的高可用配置

2020-06-18 Thread chaojianok
很奇怪,我打开这封邮件什么内容都没有。
Flink 高可以用配置推荐看下这两篇文档:
https://ci.apache.org/projects/flink/flink-docs-master/ops/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html



















At 2020-06-19 11:09:13, "Tony"  wrote:


Re:Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-18 Thread chaojianok
+1 to support HBase 2.x
And I think the 1.4.x version can be retained for the time being, so that users 
who are currently using the 1.4.x version can have more time to evaluate 
whether their projects need to be upgraded and the cost of upgrading.
















At 2020-06-19 12:35:36, "Jark Wu"  wrote:

+1 to support HBase 2.x

But not sure about dropping support for 1.4.x


I cc'ed to user@ and user-zh@ to hear more feedback from users. 

Best,
Jark


On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:

Hi All!

I would like to revive an old ticket
 and discussion around
upgrading the HBase version of the connector.

The current HBase version is 1.4.3 which is over 2 years old at this point
and incompatible with the newer HBase versions used at many companies.

We propose to upgrade the connector to the latest version and drop support
for the old version starting from the 1.12 Flink release. This would help
us maintain and improve the HBase connector over time.

If the community agrees we are happy to contribute this upgrade as we have
already developed and tested the updated version.

Cheers,
Gyula


Re:Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-18 Thread chaojianok
+1 to support HBase 2.x
And I think the 1.4.x version can be retained for the time being, so that users 
who are currently using the 1.4.x version can have more time to evaluate 
whether their projects need to be upgraded and the cost of upgrading.
















At 2020-06-19 12:35:36, "Jark Wu"  wrote:

+1 to support HBase 2.x

But not sure about dropping support for 1.4.x


I cc'ed to user@ and user-zh@ to hear more feedback from users. 

Best,
Jark


On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:

Hi All!

I would like to revive an old ticket
 and discussion around
upgrading the HBase version of the connector.

The current HBase version is 1.4.3 which is over 2 years old at this point
and incompatible with the newer HBase versions used at many companies.

We propose to upgrade the connector to the latest version and drop support
for the old version starting from the 1.12 Flink release. This would help
us maintain and improve the HBase connector over time.

If the community agrees we are happy to contribute this upgrade as we have
already developed and tested the updated version.

Cheers,
Gyula


Flink SQL 1.10.0窗口计算结果无法sink

2020-06-18 Thread 王超
各位大神求帮忙看一下。

Flink 版本:1.10.0
Planner:blink

我在使用Flink SQL的时候遇到了一个问题,能否帮忙看一下,我尝试在寻找了解决方法,但是没有起作用。
比如我发现类似的问题
https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html
中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决。


Flink Table Env配置
*StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();*
*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*
*env.setParallelism(1);*
*EnvironmentSettings envSetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  *
*StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
envSetting);*
*tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));  *


我这个job应用中定义了两个table,分别为source table “sqlDdlAnaTable”

*String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, datatime
BIGINT, list ARRAY , ts AS
TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - INTERVAL
'5' SECOND)" +*
*" WITH (" +*
*"'connector.type' = 'pravega'," +*
*"'connector.version' = '1'," +*
*"'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090 '," +*
*"'connector.connection-config.default-scope' = 'Demo'," +*
*"'connector.reader.stream-info.0.stream' = 'test'," +*
*"'format.type' = 'json'," +*
*"'format.fail-on-missing-field' = 'false', " +*
*"'update-mode' = 'append')";*

和sink table " sqlDdlSinkTable ".

* String sqlDdlSinkTable = "CREATE TABLE tb_sink" +*
*"(id STRING, " +*
*"wStart TIMESTAMP(3) , " +*
*"v FLOAT)" +*
*" WITH (" +*
*"'connector.type' = 'pravega'," +*
*"'connector.version' = '1'," +*
*"'connector.connection-config.controller-uri'=
'tcp://192.168.188.130:9090 '," +*
*"'connector.connection-config.default-scope' = 'Demo'," +*
*"'connector.writer.stream' = 'result'," +*
*"'connector.writer.routingkey-field-name' = 'id'," +*
*"'connector.writer.mode' = 'atleast_once'," +*
*"'format.type' = 'json'," +*
*"'update-mode' = 'append')";*

在数据处理逻辑比较简单,计算10s tumble window的vaule的平均。
我一开始直接打印结果能够明确看到10s中输出一次计算结果,watermark也正常移动。
*String sqlAna = "SELECT ts, id, v " +*
*"FROM tb_JsonRecord " +*
*"WHERE q=1 AND type=1";*
*Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);*
*tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);*

*tableEnv.toAppendStream(tableAnaRecord, Row.class).print()*



但是我尝试将结果insert到sink table中发现,就没有任何结果被写入。
*String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " +*
*"SELECT id, " +*
*"TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, "  +*
*"AVG(v) FROM tb_AnaRecord " +*
*"GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id";   *
* tableEnv.sqlUpdate(sqlAnaAvg);*


提前感谢!

BR//Chao


Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-18 Thread Jark Wu
+1 to support HBase 2.x
But not sure about dropping support for 1.4.x

I cc'ed to user@ and user-zh@ to hear more feedback from users.

Best,
Jark

On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:

> Hi All!
>
> I would like to revive an old ticket
>  and discussion around
> upgrading the HBase version of the connector.
>
> The current HBase version is 1.4.3 which is over 2 years old at this point
> and incompatible with the newer HBase versions used at many companies.
>
> We propose to upgrade the connector to the latest version and drop support
> for the old version starting from the 1.12 Flink release. This would help
> us maintain and improve the HBase connector over time.
>
> If the community agrees we are happy to contribute this upgrade as we have
> already developed and tested the updated version.
>
> Cheers,
> Gyula
>


Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-18 Thread Jark Wu
+1 to support HBase 2.x
But not sure about dropping support for 1.4.x

I cc'ed to user@ and user-zh@ to hear more feedback from users.

Best,
Jark

On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:

> Hi All!
>
> I would like to revive an old ticket
>  and discussion around
> upgrading the HBase version of the connector.
>
> The current HBase version is 1.4.3 which is over 2 years old at this point
> and incompatible with the newer HBase versions used at many companies.
>
> We propose to upgrade the connector to the latest version and drop support
> for the old version starting from the 1.12 Flink release. This would help
> us maintain and improve the HBase connector over time.
>
> If the community agrees we are happy to contribute this upgrade as we have
> already developed and tested the updated version.
>
> Cheers,
> Gyula
>


Re: Native K8S not creating TMs

2020-06-18 Thread Yang Wang
Thanks for sharing the DEBUG level log.

I carefully check the logs and find that the kubernetes-client discovered
the
api server address and token successfully.  However, it could not contact
with
api server(10.100.0.1:443). Could you check whether you api server is
configured
to allow accessing within cluster.

I think you could start any pod and tunnel in to run the following command.

KUBE_TOKEN=$(https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT/api

BTW, what's your kubernetes version? And i am not sure whether increasing
the timeout
could help.

-Dcontainerized.master.env.KUBERNETES_REQUEST_TIMEOUT=6
-Dcontainerized.master.env.KUBERNETES_CONNECTION_TIMEOUT=6


Best,
Yang


Yang Wang  于2020年6月16日周二 下午12:00写道:

> Hi Kevin,
>
> Sorry for not notice your last response.
> Could you share you full DEBUG level jobmanager logs? I will try to figure
> out
> whether it is a issue of Flink or K8s. Because i could not reproduce your
> situation
> with my local K8s cluster.
>
>
> Best,
> Yang
>
> Yang Wang  于2020年6月8日周一 上午11:02写道:
>
>> Hi Kevin,
>>
>> It may because the characters length limitation of K8s(no more than
>> 63)[1]. So the pod
>> name could not be too long. I notice that you are using the client
>> automatic generated
>> cluster-id. It may cause problem and could you set a meaningful
>> cluster-id for your Flink
>> session? For example,
>>
>> kubernetes-session.sh ... -Dkubernetes.cluster-id=my-flink-k8s-session
>>
>> This behavior has been improved in Flink 1.11 to check the length in
>> client side before submission.
>>
>> If it still could not work, could you share your full command and
>> jobmanager logs? It will help a lot
>> to find the root cause.
>>
>>
>> [1].
>> https://stackoverflow.com/questions/50412837/kubernetes-label-name-63-character-limit
>>
>>
>> Best,
>> Yang
>>
>> kb  于2020年6月6日周六 上午1:00写道:
>>
>>> Thanks Yang for the suggestion, I have tried it and I'm still getting the
>>> same exception. Is it possible its due to the null pod name? Operation:
>>> [create]  for kind: [Pod]  with name: [null]  in namespace: [default]
>>> failed.
>>>
>>> Best,
>>> kevin
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-18 Thread wangweigu...@stevegame.cn

  可以通过异步的方式(RichAsyncFunction)进行维表关联操作,异步多线程方式进行维表读取!



 
发件人: Jim Chen
发送时间: 2020-06-19 10:34
收件人: user-zh
主题: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性
请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢
 
Jark Wu  于2020年6月17日周三 上午12:11写道:
 
> 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
>
> 当前版本下的话,可以尝试 keyby+localcache+异步IO。
>
> Best,
> Jark
>
> On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
>
> > 或者采用redis做维表存储介质。
> >
> > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > >
> > > hi,大家
> > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> >
>


Re: flink 1.10 on yarn 内存超用,被kill

2020-06-18 Thread Yun Tang
Hi

单个Slot的managed memory是多少(可以通过webUI或者TM的日志观察到),rocksDB的 block cache 
usage会增长到多少,是一直在增长最终超过单个slot的managed memory么?

RocksDB的内存托管在绝大部分场景下是work的,但是RocksDB本身的实现限制了这个功能完美发挥作用。具体涉及到LRUcache和Writebuffer
 manager之间的对应关系,目前RocksDB的strict cache limit和将write buffer 
manager的内存申请“托管”到cache的功能是不完整的,即使在cache达到capacity的情况下,仍然可以申请内存并插入,所以存在小概率的情况会出现内存超用。

想要绕开这个问题,可以先增大TM的process内存,来增大overhead 内存 [1],可以给RocksDB提供一定的buffer。

从RocksDB的角度的话,增大flush线程数以及降低arena 的size可以降低该问题出现概率。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html#overview

祝好
唐云



From: 1017517291 <1017517...@qq.com>
Sent: Friday, June 19, 2020 10:31
To: user-zh 
Subject: flink 1.10 on yarn 内存超用,被kill

Hi,
  社区的大大们好,请教个问题,我最近一个作业在线上运行,运行一天左右就被yarn kill掉了,提示超过内存限制,打印日志如下:


java.lang.Exception: [2020-06-19 00:33:57.249]Container 
[pid=771992,containerID=container_e05_1592035979967_0057_01_06] is running 
745472B beyond the 'PHYSICAL' memo
ry limit. Current usage: 10.0 GB of 10 GB physical memory used; 19.1 GB of 21 
GB virtual memory used. Killing container.



我的flink版本是1.10,分配的单TaskManager内存为10g, 
查看监控,JVM堆内存一直比较平稳,怀疑是RocksDB使用的存储比较大,打开了RocksDB的block usage监控,确实一直在增长;
然后我调整了taskmanager.memory.managed.fraction为:0.7,但是还是block usage一直增长,最终被kill掉;
我已经开启了state.backend.rocksdb.memory.managed = 
true,按理说,RocksDB托管内存不会一直增长,看1.10官方文档介绍,RocksDB的使用内存会被控制在托管内存之内,但是我的作业使用的block 
usage一直在增长,最终导致被容器kill掉;


想问下
1. 什么情况下RocksDB的内存不受控制,一直增长,超过分配的managedmemory
2. 在1.10版本中,还有什么情况,会导致内存超过限制,被yarn kill掉


期待各位大佬回复


flink的高可用配置

2020-06-18 Thread Tony


Join tables created from Datastream whose element scala type has field Option[_]

2020-06-18 Thread YI
Hi, all

I am trying to join two datastream whose element types are respectively
```
case class MyEvent(
_id: Long = 0L,
_cId: Long = 0L,
_url: Option[String] = None,
)
```
and
```
case class MyCategory(
_id: Long = 0L,
_name: Option[String] = None,
)
```

When I tried to join those two tables with
```
SELECT * FROM rawCategory INNER JOIN rawEvent ON rawEvent._cId = rawCategory._id
```

The following exception is thrown,

```
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
'scala.Option' cannot be used in a join operation because it does not implement 
a proper hashCode() method.
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:176)
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.$anonfun$validateEqualsHashCode$1(TypeCheckUtils.scala:149)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
at 
org.apache.flink.table.runtime.join.NonWindowJoin.(NonWindowJoin.scala:57)
at 
org.apache.flink.table.runtime.join.NonWindowInnerJoin.(NonWindowInnerJoin.scala:50)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:118)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.getJoinOperator(DataStreamJoinToCoProcessTranslator.scala:102)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:119)
at 
org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251)
at 
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:412)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.sub.package$.testJoin(package.scala:143)
at io.redacted.sub.package$.process(package.scala:128)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

I tried using vanilla String with null. I encountered several NPE.
My intention is to use Option to indicate some value is missing, just like null 
in database and hopefully without NPE.

How should I define my data types? And which configuration should I take 
special care?

Bests,
Yi

Re:pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack



测试使用如下结构:
table= t_env.from_path("source")


if table.filter("logType=syslog"):
table.filter("logType=syslog").insert_into("sink1")
elif table.filter("logType=alarm"):
table.filter("logType=alarm").insert_into("sink2")




我测试了下,好像table.filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??








在 2020-06-19 10:08:25,"jack"  写道:
>使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
>
>
>场景:使用pyflink通过filter进行条件过滤后插入到sink中,
>比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
>{
>"logType":"syslog",
>"message":"sla;flkdsjf"
>}
>{
>"logType":"alarm",
>"message":"sla;flkdsjf"
>}
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")
>有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
>if logType=="syslog":
>   insert_into(sink1)
>elif logType=="alarm":
>   insert_into(sink2)
>
>
>如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
>
>
>  t_env.from_path("source")\
>  .filter("logType=syslog")\
>  .insert_into("sink1")\
>  .filter("logType=alarm")\
>  .insert_into("sink2")
>请各位大牛指点,感谢
>
>
>
>
>


Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-18 Thread Jim Chen
请问下,在flink sql1.10中, localcache+异步IO,这个方案,是直接写sql关联维表就行了吗?flink
sql会自动在底层做优化工作吗?如果要自己手动实现的话,有没有什么demo呢?谢谢

Jark Wu  于2020年6月17日周三 上午12:11写道:

> 如果更新非常频繁,又要保证关联的准确性,又要保证吞吐,那么最佳的解决方案我觉得只能是关联 changelog 了,
> 只是 Flink 目前还没有原生支持维表关联一个 changelog,会在Flink SQL 1.12中去支持。
>
> 当前版本下的话,可以尝试 keyby+localcache+异步IO。
>
> Best,
> Jark
>
> On Tue, 16 Jun 2020 at 22:35, 李奇 <359502...@qq.com> wrote:
>
> > 或者采用redis做维表存储介质。
> >
> > > 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> > >
> > > hi,大家
> > > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
> >
>


flink 1.10 on yarn ????????????kill

2020-06-18 Thread 1017517291
Hi??
  
??yarn 
kill??


java.lang.Exception: [2020-06-19 00:33:57.249]Container 
[pid=771992,containerID=container_e05_1592035979967_0057_01_06] is running 
745472B beyond the 'PHYSICAL' memo
ry limit. Current usage: 10.0 GB of 10 GB physical memory used; 19.1 GB of 21 
GB virtual memory used. Killing container.



flink??1.10??TaskManager??10g, 
??JVM??RocksDBRocksDB??block
 usage??
taskmanager.memory.managed.fraction0.7??block 
usagekill
state.backend.rocksdb.memory.managed = 
true??RocksDB1.10??RocksDBblock
 usage??kill


??
1. ??RocksDBmanagedmemory
2. ??1.10yarn kill??




pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?


场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)


如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:


  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")\
  .filter("logType=alarm")\
  .insert_into("sink2")
请各位大牛指点,感谢







pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 Thread jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?


场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")
有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
if logType=="syslog":
   insert_into(sink1)
elif logType=="alarm":
   insert_into(sink2)


如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:


  t_env.from_path("source")\
  .filter("logType=syslog")\
  .insert_into("sink1")\
  .filter("logType=alarm")\
  .insert_into("sink2")
请各位大牛指点,感谢







Re:Re: 项目引用flink-1.11.0,打包失败

2020-06-18 Thread Zhou Zach
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment

















在 2020-06-18 19:41:08,"Jark Wu"  写道:
>能贴下完整代码吗? (imports 部分)
>
>Best,
>Jark
>
>On Thu, 18 Jun 2020 at 19:18, Zhou Zach  wrote:
>
>>
>>
>> flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
>> idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij
>> Idea Build可以成功,就是打包的时候出错。。
>>
>>
>>
>>
>> [ERROR]
>> /Users/Zach/flink-common_1.11.0/src/main/scala/org/rabbit/sql/FromKafkaSinkJdbcForUserUV.scala:7:
>> error: object StreamTableEnvironment is not a member of package
>> org.apache.flink.table.api.bridge.java
>> [ERROR] import
>> org.apache.flink.table.api.bridge.java.StreamTableEnvironment
>>
>>
>>
>>
>> 代码:
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> streamExecutionEnv.enableCheckpointing(20 * 1000,
>> CheckpointingMode.EXACTLY_ONCE)
>> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(900 * 1000)
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>> pom.xml:
>> 
>>   UTF-8
>> 
>> 1.11-SNAPSHOT
>>   1.8
>>   2.11.12
>>   2.11
>>   ${java.version}
>>   ${java.version}
>>
>> 
>>  org.apache.flink
>>  flink-java
>>  ${flink.version}
>> 
>> 
>>   
>>  org.apache.flink
>>
>>  flink-streaming-java_${scala.binary.version}
>>  ${flink.version}
>> 
>> 
>>
>> 
>> 
>>  org.apache.flink
>>  flink-table
>>  ${flink.version}
>>  pom
>> 
>> 
>>
>>   
>>  org.apache.flink
>>  flink-scala_2.11
>>  ${flink.version}
>> 
>> 
>>   
>>  org.apache.flink
>>  flink-jdbc_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>   
>>  org.apache.flink
>>  flink-streaming-scala_2.11
>>  ${flink.version}
>> 
>> 
>>
>>   
>>  org.apache.flink
>>  flink-table-common
>>  ${flink.version}
>> 
>> 
>> 
>> 
>>  org.apache.flink
>>  flink-table-api-scala-bridge_2.11
>>  ${flink.version}
>> 
>> 
>>
>> 
>> 
>>  org.apache.flink
>>  flink-table-api-scala_2.11
>>  ${flink.version}
>> 
>> 
>>
>>
>>
>>
>> 
>>
>>   
>>   
>>
>>
>> 
>>  org.apache.flink
>>  flink-connector-kafka_2.11
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  flink-avro
>>  ${flink.version}
>>  provided
>>   
>>   
>>  org.apache.flink
>>  flink-csv
>>  ${flink.version}
>>  provided
>>   
>> 
>> 
>>  org.apache.flink
>>  flink-json
>>  ${flink.version}
>>  provided
>>   
>>
>>
>> 
>>
>>
>> 
>>  org.apache.bahir
>>  flink-connector-redis_2.11
>>  1.0
>>  provided
>>   
>>
>> 
>> 
>>  org.apache.flink
>>  flink-connector-hive_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>> 
>> 
>> 
>> 
>> 
>> 
>>
>> 
>>  org.apache.flink
>>  flink-table-api-java
>>  ${flink.version}
>>  provided
>>   
>>
>> 
>> 
>>  org.apache.flink
>>  flink-table-planner_2.11
>>  ${flink.version}
>> 
>> 
>>
>>   
>>  org.apache.flink
>>  flink-table-planner-blink_2.11
>>  ${flink.version}
>>  provided
>>   
>> 
>> 
>>  org.apache.flink
>>  flink-sql-connector-kafka_2.11
>>  ${flink.version}
>>  provided
>>   
>>
>>
>>   
>>  org.apache.flink
>>  flink-connector-hbase_2.11
>>  ${flink.version}
>>   


Re: Blink Planner Retracting Streams

2020-06-18 Thread John Mathews
Below is a basic unit test of what we are trying to achieve, but basically,
we are trying to convert from a retracting stream to a
RetractingStreamTableSink, which is easily done with the CRow from the
original flink planner, but seems to be very difficult to do with the blink
planner.

The below fails with:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink default_catalog.default_database.sink2 do not
match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [key: STRING, id: STRING]

but will succeed if you uncomment the CRow lines of code and run with the
original table planner.

Any thoughts on how we can accomplish this?

@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(executionEnvironment, settings);

Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "1");

SingleOutputStreamOperator source =

executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);

tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select id, key from table1");

RowTypeInfo rowTypeInfo = new
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
DataStream> tuple2DataStream =
tableEnvironment.toRetractStream(outputTable, rowTypeInfo);

// This code block below works on Flink planner but fails on Blink
planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));

tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a retracting table sink
TableSchema.Builder schemaBuilder = TableSchema.builder();
schemaBuilder.field("key", DataTypes.STRING());
schemaBuilder.field("id", DataTypes.STRING());
TableSchema schema = schemaBuilder.build();
RetractSink retractTableSink = new RetractSink(new
CollectingTableSink(schema));
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the output to the sink
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
}

private static class RetractSink implements RetractStreamTableSink {

private final AppendStreamTableSink delegate;

RetractSink(AppendStreamTableSink delegate) {
this.delegate = delegate;
}

@Override
public TypeInformation getRecordType() {
return delegate.getOutputType();
}

@Override
public TableSchema getTableSchema() {
return delegate.getTableSchema();
}

@Override
public TupleTypeInfo> getOutputType() {
return new TupleTypeInfo<>(Types.BOOLEAN(), getRecordType());
}

@Override
public void emitDataStream(DataStream> dataStream) {
consumeDataStream(dataStream);
}

@Override
public DataStreamSink
consumeDataStream(DataStream> dataStream) {
DataStream filteredAndMapped =
dataStream.flatMap(new TupleMapper()).returns(getRecordType());

return delegate.consumeDataStream(filteredAndMapped);
}

@Override
public TableSink> configure(String[]
fieldNames, TypeInformation[] fieldTypes) {
throw new UnsupportedOperationException();
}
}

private static final class TupleMapper implements
FlatMapFunction, Row> {
@Override
public void flatMap(Tuple2 value, Collector out) {
if (value.f0) {
out.collect(value.f1);
}
}
}


On Thu, Jun 18, 2020 at 10:21 AM John Mathews 
wrote:

> So the difference between Tuple2 and CRow is that CRow has a
> special TypeInformation defined here:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32
>
>
> that returns the TypeInfo of the underlying row, whereas the
> TypeInformation for Tuple2 will return type information that contains the
> boolean for the retraction + a nested type info for the row. So all
> downstream operations that rely on seeing just the row type info now break
> for us.
>
> On Wed, Jun 17, 2020 at 9:23 PM Jark Wu  wrote:
>
>> Hi John,
>>
>> Maybe I misunderstand something, but CRow doesn't have the `getSchema()`
>> method. You can getSchema() on the Table, this also works if you convert
>> the table into Tuple2.
>> Actually, there is no big difference between CRow and Tuple2> Row>, they both wrap the change flag and 

Re: Kinesis ProvisionedThroughputExceededException

2020-06-18 Thread M Singh
 Thanks Roman for your response.  Mans
On Wednesday, June 17, 2020, 03:26:31 AM EDT, Roman Grebennikov 
 wrote:  
 
 #yiv4075825537 p.yiv4075825537MsoNormal, #yiv4075825537 
p.yiv4075825537MsoNoSpacing{margin:0;}Hi,

It will occur if your job will reach SHARD_GETRECORDS_RETRIES consecutive 
failed attempts to pull the data from kinesis.
So if you scale up the topic in kinesis and tune a bit backoff parameters, you 
will lower the probability of this exception almost to zero (but with increased 
costs and worst-case latency).

But yes, this is a main drawback of managed solutions - as far as you reach a 
significant load, you need to pay more. Other managed option within AWS is to 
switch to MSK, managed Kafka, which has no such significant restrictions.

And the final option is to wait until FLINK-17688 will be implemented (using 
Kinesis enhanced fan-out, so Kinesis will push the data to consumer, instead of 
consumer periodically pulling the data).

Roman Grebennikov | g...@dfdx.me


On Wed, Jun 17, 2020, at 04:39, M Singh wrote:



Thanks Roman for your response and advice.

>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 

Please let me know if I have missed anything.

Mans
On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov  
wrote:


Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 

http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks





  

Re: Completed Job List in Flink UI

2020-06-18 Thread Chesnay Schepler

https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#jobstore-expiration-time

On 18/06/2020 19:57, Ivan Yang wrote:

Hello,

In Flink web UI Overview tab, "Completed Job List” displays recent completed or 
cancelled job only for short period of time. After a while, they are gone. The Job 
Manager is up and never restarted. Is there a config key to keep job history in the 
Completed Job List for longer time? I am using Flink 1.9

Thank you in advance.

Ivan





Re: Trouble with large state

2020-06-18 Thread Jeff Henrikson

Vijay,

Thanks for your thoughts.  Below are answers to your questions.

> 1. What's your checkpoint interval?

I have used many different checkpoint intervals, ranging from 5 minutes 
to never.  I usually setMinPasueBetweenCheckpoints to the same value as 
the checkpoint interval.


> 2. How frequently are you updating the state into RocksDB?

My understanding is that for .cogroup:

  - Triggers control communication outside the operator
  - Evictors control cleanup of internal state
  - Configurations like write buffer size control the frequency of 
state change at the storage layer
  - There is no control for how frequently the window state updates at 
the layer of the RocksDB api layer.


Thus, the state update whenever data is ingested.

> 3. How many task managers are you using?

Usually I have been running with one slot per taskmanager.  28GB of 
usable ram on each node.


> 4. How much data each task manager handles while taking the checkpoint?

Funny you should ask.  I would be okay with zero.

The application I am replacing has a latency of 36-48 hours, so if I had 
to fully stop processing to take every snapshot synchronously, it might 
be seen as totally acceptable, especially for initial bootstrap.  Also, 
the velocity of running this backfill is approximately 115x real time on 
8 nodes, so the steady-state run may not exhibit the failure mode in 
question at all.


It has come as some frustration to me that, in the case of 
RocksDBStateBackend, the configuration key state.backend.async 
effectively has no meaningful way to be false.


The only way I have found in the existing code to get a behavior like 
synchronous snapshot is to POST to /jobs//stop with drain=false 
and a URL.  This method of failing fast is the way that I discovered 
that I needed to increase transfer threads from the default.


The reason I don't just run the whole backfill and then take one 
snapshot is that even in the absence of checkpoints, a very similar 
congestion seems to take the cluster down when I am say 20-30% of the 
way through my backfill.


Reloading from my largest feasible snapshot makes it possible to make 
another snapshot a bit larger before crash, but not by much.


On first glance, the code change to allow RocksDBStateBackend into a 
synchronous snapshots mode looks pretty easy.  Nevertheless, I was 
hoping to do the initial launch of my application without needing to 
modify the framework.


Regards,


Jeff Henrikson


On 6/18/20 7:28 AM, Vijay Bhaskar wrote:

For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?

For points (3) and (4) , you should be very careful. I feel you are 
stuck at this.
You try to scale vertically by increasing more CPU and memory for each 
task manager.

If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.

Regards
Bhaskar





On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor > wrote:


I had a similar problem.   I ended up solving by not relying on
checkpoints for recovery and instead re-read my input sources (in my
case a kafka topic) from the earliest offset and rebuilding only the
state I need.  I only need to care about the past 1 to 2 days of
state so can afford to drop anything older.   My recovery time went
from over an hour for just the first checkpoint to under 10 minutes.

Tim

On Wed, Jun 17, 2020, 11:52 PM Yun Tang mailto:myas...@live.com>> wrote:

Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably
take checkpoints or savepoints. "
What is the exact reason that job cannot complete
checkpoint? Expired before completing or decline by some
tasks? The former one is manly caused by high back-pressure
and the later one is mainly due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due
to GC impact, I think you might need to check task-manager
logs and GC logs.

Best
Yun Tang

*From:* Jeff Henrikson mailto:jehenri...@gmail.com>>
*Sent:* Thursday, June 18, 2020 1:46
*To:* user mailto:user@flink.apache.org>>
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events
are
read from kafka and have event timestamps.  The joins are built
using
.cogroup, with a global window, triggering on every 1 event, plus a

Re: Trouble with large state

2020-06-18 Thread Jeff Henrikson

Hi Yun,

Thanks for your thoughts.  Answers to your questions:

>  1. "after around 50GB of state, I stop being able to reliably take
> checkpoints or savepoints. "
> What is the exact reason that job cannot complete checkpoint?
> Expired before completing or decline by some tasks? The former one
> is manly caused by high back-pressure and the later one is mainly
> due to some internal error.

In the UI, under Job | Checkpoints | History, then opening the 
checkpoint detail, the checkpoints fail by not some operators not 
acknowledging.  It's always a subset of of the larger state operators 
that stop acknowledging.  The exact selection of operators that stop is 
nondeterministic.  The checkpoints frequently fail before any timeout 
that I impose on them.


>  2. Have you checked what reason the remote task manager is lost?
> If the remote task manager is not crashed, it might be due to GC
> impact, I think you might need to check task-manager logs and GC 
logs.


The only general pattern I have observed is:

1) Some taskmanager A throws one of the various connectivity
exceptions I listed complaining about another taskmanager B.
2) Taskmanager B shows no obvious error other than complaining
that taskmanager A has disconnected from it.

Regards,


Jeff Henrikson



On 6/17/20 9:52 PM, Yun Tang wrote:

Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably take
checkpoints or savepoints. "
What is the exact reason that job cannot complete checkpoint?
Expired before completing or decline by some tasks? The former one
is manly caused by high back-pressure and the later one is mainly
due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due to GC
impact, I think you might need to check task-manager logs and GC logs.

Best
Yun Tang

*From:* Jeff Henrikson 
*Sent:* Thursday, June 18, 2020 1:46
*To:* user 
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events are
read from kafka and have event timestamps.  The joins are built using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the same ID
has been processed.  Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention has been
attained.  The application runs RocksDBStateBackend, on Kubernetes on
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node
cluster, watermark output progress seems to indicate I should be able to
bootstrap my state of around 500GB in around 1 day.  I am able to save
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able
to reliably take checkpoints or savepoints.  Some time after that, I
start getting a variety of failures where the first suspicious log event
is a generic cluster connectivity error, such as:

  1) java.io.IOException: Connecting the channel failed: Connecting
  to remote task manager + '/10.67.7.101:38955' has failed. This
  might indicate that the remote task manager has been lost.

  2) org.apache.flink.runtime.io.network.netty.exception
  .RemoteTransportException: Connection unexpectedly closed by remote
  task manager 'null'. This might indicate that the remote task
  manager was lost.

  3) Association with remote system
  [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
  gated for [50] ms. Reason: [Association failed with
  [akka.tcp://flink@10.67.6.66:34987]] Caused by:
  [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum
savable state size.

I could enable HA, but for the time being I have been leaving it out to
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
  parallelism=8
  maxParallelism=64
  setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
  setTolerableCheckpointFailureNumber(1000)
  setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  RocksDBStateBackend
  setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
  setNumberOfTransferThreads(25)
  setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

  jobmanager.rpc.address: localhost
  

Completed Job List in Flink UI

2020-06-18 Thread Ivan Yang
Hello,

In Flink web UI Overview tab, "Completed Job List” displays recent completed or 
cancelled job only for short period of time. After a while, they are gone. The 
Job Manager is up and never restarted. Is there a config key to keep job 
history in the Completed Job List for longer time? I am using Flink 1.9

Thank you in advance.

Ivan

Re: what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

2020-06-18 Thread Marco Villalobos
I came up with a solution for backfills. However, at this moment, I am not 
happy with my solution.
I think there might be other facilities within Flink which allow me to 
implement a better more efficient or more scalable solution.

In another post, rmetz...@apache.org suggested that I use a process function 
and a timer. He was right in that I should use that approach. I want to thank 
him.

The averages are computed by a ProcessWindowFunction that keys by the name and 
window size and uses a tumbling event time window.

However, after that average is complete, I then use a KeyedProcessFunction that 
is keyed by window size. I then use a somewhat brute force approach with 
ValueState> to track names that need a value and a MAP STATE to 
determine which values exist and which ones are backfilled.
It also cleans up stale values.

I committed my code to a branch 
https://github.com/minmay/flink-patterns/tree/feature/backfill , and I also 
created a pull request https://github.com/minmay/flink-patterns/pull/1/files to 
share my experience.

I am open critical comments on my approach, lack of understanding of Flink, 
algorithms and data-structures used. Please refrain from comments on my code 
style though.

I'll also copy and paste my solution below.

package mvillalobos.flink.patterns.timeseries.average;

import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@CommandLine.Command(name = "Time Series Average", mixinStandardHelpOptions = 
true,
description = "Compute the average of the time series with a 15 minute 
tumbling event time window and upsert the results into an Apache Derby 
database.")
public class TimeSeriesAverageApp implements Callable {

private final static Logger logger = 
LoggerFactory.getLogger(TimeSeriesAverageApp.class);

@CommandLine.Option(names = {"-f", "--input-file"}, description = "The CSV 
input file of time series data. Each line must be in the format: String, 
double, Instant.")
private File inputFile;

@Override
public Integer call() throws Exception {
stream(inputFile.toString());
return 0;
}

public void stream(String inputFilePath) throws Exception {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// GIVEN a SOURCE with a CSV input file
// in which each line has a: String, double, Instant
// THEN the MAP operator
// transforms the line into a Tuple7
// f0: name: String
// f1: window_size: int
// f2: value: double
// f3: event_timestamp: Instant
// f4: aggregate_sum: double
// f5: aggregate_count double
// f6: is_backfile: boolean
// WHEN the map operation finishes
// THEN the event time assigned using field f3
final DataStream> timeSeriesStream = 

Re: Does anyone have an example of Bazel working with Flink?

2020-06-18 Thread Austin Cawley-Edwards
Ok, no worries Aaron, that's still good advice :)

One last question - are you using JAR-based or image-based deployments? The
only real problem using Flink & Bazel and a JAR-based deployment from our
experience is removing the Flink libs present in the deploy environment
from the deploy jar, and still having them present when we want to do local
debugging/ integration testing. Also possible that we're just not using
Bazel entirely correctly!

Thank you!
Austin


On Thu, Jun 18, 2020 at 12:32 PM Aaron Levin  wrote:

> Hi Austin,
>
> In our experience, `rules_scala` and `rules_java` are enough for us at
> this point.
>
> It's entirely possible I'm not thinking far enough into the future,
> though, so don't take our lack of investment as a sign it's not worth
> investing in :)
>
> Best,
>
> Aaron Levin
>
> On Thu, Jun 18, 2020 at 10:27 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Great to hear Dan!
>>
>> @Aaron - would you/ your team be interested in a `rules_flink` project?
>> I'm still fairly new to Bazel and know enough to contribute, but could
>> definitely use guidance on design as well.
>>
>> Best,
>> Austin
>>
>> On Mon, Jun 15, 2020 at 11:07 PM Dan Hill  wrote:
>>
>>> Thanks for the replies!  I was able to use the provided answers to get a
>>> setup working (maybe not the most efficiently).  The main change I made was
>>> to switch to including the deploy jar in the image (rather than the default
>>> one).
>>>
>>> I'm open to contributing to a "rules_flink" project.  I don't know
>>> enough yet to help design it.
>>>
>>> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Dan,

 if you want to run a Flink job without specifying the main class via
 `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
 file to your jar under META-INF and this file needs to contain `Main-Class:
 org.a.b.Foobar`.

 Cheers,
 Till

 On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hey all,
>
> Adding to Aaron's response, we use Bazel to build our Flink apps.
> We've open-sourced some of our setup here[1] though a bit outdated. There
> are definitely rough edges/ probably needs a good deal of work to fit 
> other
> setups. We have written a wrapper around the `java_library` and
> `java_binary` and could do the same for `rules_scala`, though we just
> started using Bazel last November and have a lot to learn in terms of best
> practices there.
>
> If you're interested in contributing to a `rules_flink` project, I
> would be as well!
>
> Best,
> Austin
>
> [1]:
> https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020
>
> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
> wrote:
>
>> Hi Dan,
>>
>> We use Bazel to compile our Flink applications. We're using
>> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage
>> the dependencies and produce jars. We haven't had any issues. However, I
>> have found that sometimes it's difficult to figure out exactly what Flink
>> target or dependency my application needs.
>>
>> Unfortunately I'm not sure what issue you're seeing here. I would
>> guess either your flink application wasn't compiled into the jar
>> you're executing. If you can paste the bazel target used to generate your
>> jar and how you're launching the application, that will be helpful
>> for diagnosis.
>>
>> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill 
>> wrote:
>>
>>> I took the Flink playground and I'm trying to swap out Maven for
>>> Bazel.  I got to the point where I'm hitting the following error.  I 
>>> want
>>> to diff my code with an existing, working setup.
>>>
>>> Thanks! - Dan
>>>
>>>
>>> client_1| 
>>> org.apache.flink.client.program.ProgramInvocationException:
>>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>>> file.
>>>
>>> client_1| at
>>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>>
>>> client_1| at
>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>>
>>> client_1| at
>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>>
>>> client_1| at
>>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>>
>>> client_1| at
>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>>
>>> client_1| at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>
>>> client_1| at
>>> 

Re: Does anyone have an example of Bazel working with Flink?

2020-06-18 Thread Aaron Levin
Hi Austin,

In our experience, `rules_scala` and `rules_java` are enough for us at this
point.

It's entirely possible I'm not thinking far enough into the future, though,
so don't take our lack of investment as a sign it's not worth investing in
:)

Best,

Aaron Levin

On Thu, Jun 18, 2020 at 10:27 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Great to hear Dan!
>
> @Aaron - would you/ your team be interested in a `rules_flink` project?
> I'm still fairly new to Bazel and know enough to contribute, but could
> definitely use guidance on design as well.
>
> Best,
> Austin
>
> On Mon, Jun 15, 2020 at 11:07 PM Dan Hill  wrote:
>
>> Thanks for the replies!  I was able to use the provided answers to get a
>> setup working (maybe not the most efficiently).  The main change I made was
>> to switch to including the deploy jar in the image (rather than the default
>> one).
>>
>> I'm open to contributing to a "rules_flink" project.  I don't know enough
>> yet to help design it.
>>
>> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> if you want to run a Flink job without specifying the main class via
>>> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
>>> file to your jar under META-INF and this file needs to contain `Main-Class:
>>> org.a.b.Foobar`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey all,

 Adding to Aaron's response, we use Bazel to build our Flink apps. We've
 open-sourced some of our setup here[1] though a bit outdated. There are
 definitely rough edges/ probably needs a good deal of work to fit other
 setups. We have written a wrapper around the `java_library` and
 `java_binary` and could do the same for `rules_scala`, though we just
 started using Bazel last November and have a lot to learn in terms of best
 practices there.

 If you're interested in contributing to a `rules_flink` project, I
 would be as well!

 Best,
 Austin

 [1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020

 On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
 wrote:

> Hi Dan,
>
> We use Bazel to compile our Flink applications. We're using
> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage
> the dependencies and produce jars. We haven't had any issues. However, I
> have found that sometimes it's difficult to figure out exactly what Flink
> target or dependency my application needs.
>
> Unfortunately I'm not sure what issue you're seeing here. I would
> guess either your flink application wasn't compiled into the jar
> you're executing. If you can paste the bazel target used to generate your
> jar and how you're launching the application, that will be helpful
> for diagnosis.
>
> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill 
> wrote:
>
>> I took the Flink playground and I'm trying to swap out Maven for
>> Bazel.  I got to the point where I'm hitting the following error.  I want
>> to diff my code with an existing, working setup.
>>
>> Thanks! - Dan
>>
>>
>> client_1| 
>> org.apache.flink.client.program.ProgramInvocationException:
>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>> file.
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>
>> client_1| at
>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>
>> client_1| at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>
>> client_1| at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>
>


回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Sun.Zhu
非常感谢,我去试试


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 18:13,Rui Li 写道:
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17626017...@163.com> wrote:

对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li 写道:

是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li



--
Best regards!
Rui Li


Re: Blink Planner Retracting Streams

2020-06-18 Thread John Mathews
So the difference between Tuple2 and CRow is that CRow has a
special TypeInformation defined here:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32


that returns the TypeInfo of the underlying row, whereas the
TypeInformation for Tuple2 will return type information that contains the
boolean for the retraction + a nested type info for the row. So all
downstream operations that rely on seeing just the row type info now break
for us.

On Wed, Jun 17, 2020 at 9:23 PM Jark Wu  wrote:

> Hi John,
>
> Maybe I misunderstand something, but CRow doesn't have the `getSchema()`
> method. You can getSchema() on the Table, this also works if you convert
> the table into Tuple2.
> Actually, there is no big difference between CRow and Tuple2 Row>, they both wrap the change flag and the Row.
>
> Best,
> Jark
>
>
>
> On Thu, 18 Jun 2020 at 06:39, John Mathews  wrote:
>
>> Hello Godfrey,
>>
>> Thanks for the response!
>>
>> I think the problem with Tuple2, is that if my understanding is correct
>> of how CRow worked, when CRow's getSchema() was returned it would return
>> the underlying schema of the row it contained. Tuple2 doesn't do that, so
>> it changes/breaks a lot of our downstream code that is relying on the
>> TableSchema to return the underlying row's schema, and not a Tuple schema.
>>
>> Any thoughts on that issue?
>>
>>
>> On Wed, Jun 17, 2020 at 12:16 AM godfrey he  wrote:
>>
>>> hi John,
>>>
>>> You can use Tuple2[Boolean, Row] to replace CRow, the
>>> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
>>> T)].
>>>
>>> the code looks like:
>>>
>>> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] {
>>>   override def map(value: (Boolean, Row)): R = ...
>>> })
>>>
>>> Bests,
>>> Godfrey
>>>
>>> John Mathews  于2020年6月17日周三 下午12:13写道:
>>>
 Hello,

 I am working on migrating from the flink table-planner to the new blink
 one, and one problem I am running into is that it doesn't seem like Blink
 has a concept of a CRow, unlike the original table-planner.

 I am therefore struggling to figure out how to properly convert a
 retracting stream to a SingleOutputStreamOperator when using just the Blink
 planner libraries.

 E.g. in the old planner I could do something like this:
 SingleOutputStreamOperator stream =
 tableEnvironment.toRetractStream(table, typeInfo)
 .map(value -> new CRow(value.f1, value.f0);

 but without the CRow, I'm not sure how to accomplish this.

 Any suggestions?

 Thanks!
 John





sql-client????????????????SUCCEEDED????

2020-06-18 Thread MuChen
hi,

yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink 
-nm fsql-cli  /dev/null 21 

sql-clientsql??

kafkahive??joinmysql 
sql??
#  -- 
??5??vid??vid_group 
-- ??55mysql insert into 
rt_app.app_video_cover_abtest_test  select  begin_time,  vid,  vid_group,  
max(dv),  max(click),  max(vv),  max(effectivevv) from(  select   t1.begin_time 
begin_time,   t1.u_vid vid,   t1.u_vid_group vid_group,   dv,   click,   vv,   
if(effectivevv is null,0,effectivevv) effectivevv  from  (   -- dv??click??vv   
select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,   
 cast(u_vid as bigint) u_vid,u_vid_group,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and 
u_c_module='M011',1,0)) dv,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and 
u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and 
u_f_module='M011',1,0)) vv   FROM rt_ods.ods_applog_vidsplit   where u_vid is 
not null and trim(u_vid)<''and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')and (  (concat(u_mod,'-',u_ac) in 
('emptylog-video_display','emptylog-video_click')  and u_c_module='M011')  or  
(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011') )   group by 
TUMBLE(bjdt, INTERVAL '5' MINUTE),cast(u_vid as bigint),u_vid_group  ) 
t1  left join  (   -- effectivevv   selectbegin_time,u_vid,
u_vid_group,count(1) effectivevv   from   (select  begin_time,  u_vid,  
u_vid_group,  u_diu,  u_playid,  m_pt,  q70fromdw.video_pic_title_q70 a 
   join( select   CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS 
STRING) begin_time,  cast(u_vid as bigint) u_vid,  u_vid_group,  u_diu,  
u_playid,  max(u_playtime) m_pt FROM rt_ods.ods_applog_vidsplit where 
u_vid is not null and trim(u_vid)<''  and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')  and 
concat(u_mod,'-',u_ac)='emptylog-video_play_speed'  and u_f_module='M011'  and 
u_playtime0 group by   TUMBLE(bjdt, INTERVAL '5' MINUTE),  cast(u_vid 
as bigint),  u_vid_group,  u_diu,  u_playid) bon a.vid=b.u_vidgroup 
by   begin_time,  u_vid,  u_vid_group,  u_diu,  u_playid,  m_pt,  q70   ) temp  
 where m_pt=q70   group by begin_time,u_vid,u_vid_group  ) t2  
on t1.begin_time=t2.begin_time   and t1.u_vid=t2.u_vid   and 
t1.u_vid_group=t2.u_vid_group )t3   group by begin_time,  vid,  vid_group ; 
succeeded??https://s1.ax1x.com/2020/06/18/NnyX24.png

INFO??:
2020-06-17 21:27:07,968 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queue java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:201
 4) at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 
ps: 1. kafka
2. flink1.10.0
??SUCCEEDED

??

Re: Trouble with large state

2020-06-18 Thread Vijay Bhaskar
For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?

For points (3) and (4) , you should be very careful. I feel you are stuck
at this.
You try to scale vertically by increasing more CPU and memory for each task
manager.
If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.

Regards
Bhaskar





On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor  wrote:

> I had a similar problem.   I ended up solving by not relying on
> checkpoints for recovery and instead re-read my input sources (in my case a
> kafka topic) from the earliest offset and rebuilding only the state I
> need.  I only need to care about the past 1 to 2 days of state so can
> afford to drop anything older.   My recovery time went from over an hour
> for just the first checkpoint to under 10 minutes.
>
> Tim
>
> On Wed, Jun 17, 2020, 11:52 PM Yun Tang  wrote:
>
>> Hi Jeff
>>
>>
>>1. " after around 50GB of state, I stop being able to reliably take
>>checkpoints or savepoints. "
>>What is the exact reason that job cannot complete checkpoint? Expired
>>before completing or decline by some tasks? The former one is manly caused
>>by high back-pressure and the later one is mainly due to some internal
>>error.
>>2. Have you checked what reason the remote task manager is lost?
>>If the remote task manager is not crashed, it might be due to GC
>>impact, I think you might need to check task-manager logs and GC logs.
>>
>> Best
>> Yun Tang
>> --
>> *From:* Jeff Henrikson 
>> *Sent:* Thursday, June 18, 2020 1:46
>> *To:* user 
>> *Subject:* Trouble with large state
>>
>> Hello Flink users,
>>
>> I have an application of around 10 enrichment joins.  All events are
>> read from kafka and have event timestamps.  The joins are built using
>> .cogroup, with a global window, triggering on every 1 event, plus a
>> custom evictor that drops records once a newer record for the same ID
>> has been processed.  Deletes are represented by empty events with
>> timestamp and ID (tombstones). That way, we can drop records when
>> business logic dictates, as opposed to when a maximum retention has been
>> attained.  The application runs RocksDBStateBackend, on Kubernetes on
>> AWS with local SSDs.
>>
>> Unit tests show that the joins produce expected results.  On an 8 node
>> cluster, watermark output progress seems to indicate I should be able to
>> bootstrap my state of around 500GB in around 1 day.  I am able to save
>> and restore savepoints for the first half an hour of run time.
>>
>> My current trouble is that after around 50GB of state, I stop being able
>> to reliably take checkpoints or savepoints.  Some time after that, I
>> start getting a variety of failures where the first suspicious log event
>> is a generic cluster connectivity error, such as:
>>
>>  1) java.io.IOException: Connecting the channel failed: Connecting
>>  to remote task manager + '/10.67.7.101:38955' has failed. This
>>  might indicate that the remote task manager has been lost.
>>
>>  2) org.apache.flink.runtime.io.network.netty.exception
>>  .RemoteTransportException: Connection unexpectedly closed by remote
>>  task manager 'null'. This might indicate that the remote task
>>  manager was lost.
>>
>>  3) Association with remote system
>>  [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
>>  gated for [50] ms. Reason: [Association failed with
>>  [akka.tcp://flink@10.67.6.66:34987]] Caused by:
>>  [java.net.NoRouteToHostException: No route to host]
>>
>> I don't see any obvious out of memory errors on the TaskManager UI.
>>
>> Adding nodes to the cluster does not seem to increase the maximum
>> savable state size.
>>
>> I could enable HA, but for the time being I have been leaving it out to
>> avoid the possibility of masking deterministic faults.
>>
>> Below are my configurations.
>>
>> Thanks in advance for any advice.
>>
>> Regards,
>>
>>
>> Jeff Henrikson
>>
>>
>>
>> Flink version: 1.10
>>
>> Configuration set via code:
>>  parallelism=8
>>  maxParallelism=64
>>  setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>  setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>  setTolerableCheckpointFailureNumber(1000)
>>  setMaxConcurrentCheckpoints(1)
>>
>>
>> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>  RocksDBStateBackend
>>  setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>  setNumberOfTransferThreads(25)
>>  setDbStoragePath points to a local nvme SSD
>>
>> Configuration in flink-conf.yaml:
>>
>>  jobmanager.rpc.address: localhost

Re: Does anyone have an example of Bazel working with Flink?

2020-06-18 Thread Austin Cawley-Edwards
Great to hear Dan!

@Aaron - would you/ your team be interested in a `rules_flink` project? I'm
still fairly new to Bazel and know enough to contribute, but could
definitely use guidance on design as well.

Best,
Austin

On Mon, Jun 15, 2020 at 11:07 PM Dan Hill  wrote:

> Thanks for the replies!  I was able to use the provided answers to get a
> setup working (maybe not the most efficiently).  The main change I made was
> to switch to including the deploy jar in the image (rather than the default
> one).
>
> I'm open to contributing to a "rules_flink" project.  I don't know enough
> yet to help design it.
>
> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann 
> wrote:
>
>> Hi Dan,
>>
>> if you want to run a Flink job without specifying the main class via
>> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
>> file to your jar under META-INF and this file needs to contain `Main-Class:
>> org.a.b.Foobar`.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> Adding to Aaron's response, we use Bazel to build our Flink apps. We've
>>> open-sourced some of our setup here[1] though a bit outdated. There are
>>> definitely rough edges/ probably needs a good deal of work to fit other
>>> setups. We have written a wrapper around the `java_library` and
>>> `java_binary` and could do the same for `rules_scala`, though we just
>>> started using Bazel last November and have a lot to learn in terms of best
>>> practices there.
>>>
>>> If you're interested in contributing to a `rules_flink` project, I would
>>> be as well!
>>>
>>> Best,
>>> Austin
>>>
>>> [1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020
>>>
>>> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
>>> wrote:
>>>
 Hi Dan,

 We use Bazel to compile our Flink applications. We're using
 "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage
 the dependencies and produce jars. We haven't had any issues. However, I
 have found that sometimes it's difficult to figure out exactly what Flink
 target or dependency my application needs.

 Unfortunately I'm not sure what issue you're seeing here. I would guess
 either your flink application wasn't compiled into the jar
 you're executing. If you can paste the bazel target used to generate your
 jar and how you're launching the application, that will be helpful
 for diagnosis.

 On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:

> I took the Flink playground and I'm trying to swap out Maven for
> Bazel.  I got to the point where I'm hitting the following error.  I want
> to diff my code with an existing, working setup.
>
> Thanks! - Dan
>
>
> client_1| 
> org.apache.flink.client.program.ProgramInvocationException:
> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
> file.
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>
> client_1| at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>



Issue with job status

2020-06-18 Thread Vijay Bhaskar
Hi
I am using flink 1.9 and facing the below issue
Suppose i have deployed any job and in case there are not enough slots,
then the job is stuck in waiting for slots. But flink job status is showing
it as "RUNNING"  actually it's not.
For me this is looking like a bug. It impacts our production while
monitoring any duplicate jobs.
Moreover when we issue the flink stop command to stop these kinds of jobs,
they are not terminating because stop is associated with savepoint, which
actually fails. So the job is not stopping at all, it is forever stuck,
until manually we cancel it using cli. We don't want manual intervention.
If this is bug, i want to open a jira ticket for same

Regards
Bhaskar


Submitted Flink Jobs EMR are failing (Could not start rest endpoint on any port in port range 8081)

2020-06-18 Thread sk_ac...@yahoo.com
I am using EMR 5.30.0 and trying to submit a Flink (1.10.0) job using the 
following command
flink run -m yarn-cluster /home/hadoop/flink--test-0.0.1-SNAPSHOT.jar
and i am getting the following error:
    Caused by: 
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN 
application unexpectedly switched to state FAILED during deployment. 
After going through the logs on the worker nodes and job manager logs it looks 
like there is a port conflict
    2020-06-17 21:40:51,199 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start 
cluster entrypoint YarnJobClusterEntrypoint.    
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.            at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
            at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
            at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
    Caused by: org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.            at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:261)
            at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:215)
            at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
            at java.security.AccessController.doPrivileged(Native Method)       
     at javax.security.auth.Subject.doAs(Subject.java:422)            at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
            at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
            at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
            ... 2 more    Caused by: java.net.BindException: Could not start 
rest endpoint on any port in port range 8081            at 
org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:219)
            at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:165)
            ... 9 more
There seems to be JIRA Ticket 
(https://issues.apache.org/jira/browse/FLINK-15394) open for this (though it is 
for 1.9 version of Flink) and the suggested solution is to use port range for 
**rest.bind-port** in Flink config File.
How ever in 1.10 version of Flink we only the following the the Yan Conf YML 
File
    rest.port: 8081
Another issue i am facing is i have submitted multiple Flink jobs (same job 
multiple times) using AWS Console and via Add Step ui. Only one of the job 
succeeded and the rest have failed with the error posted above. And when i go 
to Flink UI it doesn't show any jobs at all.
Wondering whether each of the submitted jobs trying to create a Flink Yarn 
session instead of using the existing one.
ThanksSateesh


flink启动主类反射异常

2020-06-18 Thread a773807...@gmail.com
大家好:
  我在flink的启动主类上,配置了代码,根据入参的参数,动态反射加载对应的类来启动不同的flink job, 
在本地拉起是可以实现这个功能,但是部署到集群上的时候,就显示反射异常,请问是什么问题?

具体日志:
2020-06-18 20:14:06,354 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler   - Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: null
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:100)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
com.huanju.riskplatform.flinkfactor.StartPoint.main(StartPoint.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at 
com.huanju.riskplatform.flinkfactor.model.ModelThree.analysisRun(ModelThree.java:349)
... 19 more



a773807...@gmail.com


Re: Trouble with large state

2020-06-18 Thread Timothy Victor
I had a similar problem.   I ended up solving by not relying on checkpoints
for recovery and instead re-read my input sources (in my case a kafka
topic) from the earliest offset and rebuilding only the state I need.  I
only need to care about the past 1 to 2 days of state so can afford to drop
anything older.   My recovery time went from over an hour for just the
first checkpoint to under 10 minutes.

Tim

On Wed, Jun 17, 2020, 11:52 PM Yun Tang  wrote:

> Hi Jeff
>
>
>1. " after around 50GB of state, I stop being able to reliably take
>checkpoints or savepoints. "
>What is the exact reason that job cannot complete checkpoint? Expired
>before completing or decline by some tasks? The former one is manly caused
>by high back-pressure and the later one is mainly due to some internal
>error.
>2. Have you checked what reason the remote task manager is lost?
>If the remote task manager is not crashed, it might be due to GC
>impact, I think you might need to check task-manager logs and GC logs.
>
> Best
> Yun Tang
> --
> *From:* Jeff Henrikson 
> *Sent:* Thursday, June 18, 2020 1:46
> *To:* user 
> *Subject:* Trouble with large state
>
> Hello Flink users,
>
> I have an application of around 10 enrichment joins.  All events are
> read from kafka and have event timestamps.  The joins are built using
> .cogroup, with a global window, triggering on every 1 event, plus a
> custom evictor that drops records once a newer record for the same ID
> has been processed.  Deletes are represented by empty events with
> timestamp and ID (tombstones). That way, we can drop records when
> business logic dictates, as opposed to when a maximum retention has been
> attained.  The application runs RocksDBStateBackend, on Kubernetes on
> AWS with local SSDs.
>
> Unit tests show that the joins produce expected results.  On an 8 node
> cluster, watermark output progress seems to indicate I should be able to
> bootstrap my state of around 500GB in around 1 day.  I am able to save
> and restore savepoints for the first half an hour of run time.
>
> My current trouble is that after around 50GB of state, I stop being able
> to reliably take checkpoints or savepoints.  Some time after that, I
> start getting a variety of failures where the first suspicious log event
> is a generic cluster connectivity error, such as:
>
>  1) java.io.IOException: Connecting the channel failed: Connecting
>  to remote task manager + '/10.67.7.101:38955' has failed. This
>  might indicate that the remote task manager has been lost.
>
>  2) org.apache.flink.runtime.io.network.netty.exception
>  .RemoteTransportException: Connection unexpectedly closed by remote
>  task manager 'null'. This might indicate that the remote task
>  manager was lost.
>
>  3) Association with remote system
>  [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
>  gated for [50] ms. Reason: [Association failed with
>  [akka.tcp://flink@10.67.6.66:34987]] Caused by:
>  [java.net.NoRouteToHostException: No route to host]
>
> I don't see any obvious out of memory errors on the TaskManager UI.
>
> Adding nodes to the cluster does not seem to increase the maximum
> savable state size.
>
> I could enable HA, but for the time being I have been leaving it out to
> avoid the possibility of masking deterministic faults.
>
> Below are my configurations.
>
> Thanks in advance for any advice.
>
> Regards,
>
>
> Jeff Henrikson
>
>
>
> Flink version: 1.10
>
> Configuration set via code:
>  parallelism=8
>  maxParallelism=64
>  setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>  setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>  setTolerableCheckpointFailureNumber(1000)
>  setMaxConcurrentCheckpoints(1)
>
>
> enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>  RocksDBStateBackend
>  setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>  setNumberOfTransferThreads(25)
>  setDbStoragePath points to a local nvme SSD
>
> Configuration in flink-conf.yaml:
>
>  jobmanager.rpc.address: localhost
>  jobmanager.rpc.port: 6123
>  jobmanager.heap.size: 28000m
>  taskmanager.memory.process.size: 28000m
>  taskmanager.memory.jvm-metaspace.size: 512m
>  taskmanager.numberOfTaskSlots: 1
>  parallelism.default: 1
>  jobmanager.execution.failover-strategy: full
>
>  cluster.evenly-spread-out-slots: false
>
>  taskmanager.memory.network.fraction: 0.2   # default 0.1
>  taskmanager.memory.framework.off-heap.size: 2GB
>  taskmanager.memory.task.off-heap.size: 2GB
>  taskmanager.network.memory.buffers-per-channel: 32 # default 2
>  taskmanager.memory.managed.fraction: 0.4   # docs say
> default 0.1, but something seems to set 0.4
>  taskmanager.memory.task.off-heap.size: 2048MB  # default 128M
>
>  

Re: 项目引用flink-1.11.0,打包失败

2020-06-18 Thread Jark Wu
能贴下完整代码吗? (imports 部分)

Best,
Jark

On Thu, 18 Jun 2020 at 19:18, Zhou Zach  wrote:

>
>
> flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
> idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij
> Idea Build可以成功,就是打包的时候出错。。
>
>
>
>
> [ERROR]
> /Users/Zach/flink-common_1.11.0/src/main/scala/org/rabbit/sql/FromKafkaSinkJdbcForUserUV.scala:7:
> error: object StreamTableEnvironment is not a member of package
> org.apache.flink.table.api.bridge.java
> [ERROR] import
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment
>
>
>
>
> 代码:
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(20 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(900 * 1000)
>
> val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
> pom.xml:
> 
>   UTF-8
> 
> 1.11-SNAPSHOT
>   1.8
>   2.11.12
>   2.11
>   ${java.version}
>   ${java.version}
>
> 
>  org.apache.flink
>  flink-java
>  ${flink.version}
> 
> 
>   
>  org.apache.flink
>
>  flink-streaming-java_${scala.binary.version}
>  ${flink.version}
> 
> 
>
> 
> 
>  org.apache.flink
>  flink-table
>  ${flink.version}
>  pom
> 
> 
>
>   
>  org.apache.flink
>  flink-scala_2.11
>  ${flink.version}
> 
> 
>   
>  org.apache.flink
>  flink-jdbc_2.11
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-streaming-scala_2.11
>  ${flink.version}
> 
> 
>
>   
>  org.apache.flink
>  flink-table-common
>  ${flink.version}
> 
> 
> 
> 
>  org.apache.flink
>  flink-table-api-scala-bridge_2.11
>  ${flink.version}
> 
> 
>
> 
> 
>  org.apache.flink
>  flink-table-api-scala_2.11
>  ${flink.version}
> 
> 
>
>
>
>
> 
>
>   
>   
>
>
> 
>  org.apache.flink
>  flink-connector-kafka_2.11
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-avro
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-csv
>  ${flink.version}
>  provided
>   
> 
> 
>  org.apache.flink
>  flink-json
>  ${flink.version}
>  provided
>   
>
>
> 
>
>
> 
>  org.apache.bahir
>  flink-connector-redis_2.11
>  1.0
>  provided
>   
>
> 
> 
>  org.apache.flink
>  flink-connector-hive_2.11
>  ${flink.version}
>  provided
>   
>
> 
> 
> 
> 
> 
> 
>
> 
>  org.apache.flink
>  flink-table-api-java
>  ${flink.version}
>  provided
>   
>
> 
> 
>  org.apache.flink
>  flink-table-planner_2.11
>  ${flink.version}
> 
> 
>
>   
>  org.apache.flink
>  flink-table-planner-blink_2.11
>  ${flink.version}
>  provided
>   
> 
> 
>  org.apache.flink
>  flink-sql-connector-kafka_2.11
>  ${flink.version}
>  provided
>   
>
>
>   
>  org.apache.flink
>  flink-connector-hbase_2.11
>  ${flink.version}
>   


Flink 多Sink 数据一致性保证

2020-06-18 Thread xueaohui_...@163.com

如上图所示,目前通过把作业加入多个sink,这种场景下面当hbase写入的失败的时候,不影响kakfa的写入。期望hbase写入失败,kafka也不发送。
如何保证hbase和kafka的写入为原子性呢?
不知道flink是否有多sink的二阶段提交方案。



xueaohui_...@163.com


项目引用flink-1.11.0,打包失败

2020-06-18 Thread Zhou Zach


flink-1.10.0版本,引用的是org.apache.flink.table.api.java.StreamTableEnvironment,换成flink-1.11.0时,intellij
 
idea提示要换成org.apache.flink.table.api.bridge.java.StreamTableEnvironment,Intellij 
Idea Build可以成功,就是打包的时候出错。。




[ERROR] 
/Users/Zach/flink-common_1.11.0/src/main/scala/org/rabbit/sql/FromKafkaSinkJdbcForUserUV.scala:7:
 error: object StreamTableEnvironment is not a member of package 
org.apache.flink.table.api.bridge.java
[ERROR] import org.apache.flink.table.api.bridge.java.StreamTableEnvironment




代码:
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(20 * 1000, 
CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(900 * 1000)

val blinkEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
blinkEnvSettings)
pom.xml:

  UTF-8

1.11-SNAPSHOT
  1.8
  2.11.12
  2.11
  ${java.version}
  ${java.version}
   

 org.apache.flink
 flink-java
 ${flink.version}


  
 org.apache.flink
 flink-streaming-java_${scala.binary.version}
 ${flink.version}





 org.apache.flink
 flink-table
 ${flink.version}
 pom



  
 org.apache.flink
 flink-scala_2.11
 ${flink.version}


  
 org.apache.flink
 flink-jdbc_2.11
 ${flink.version}
 provided
  

  
 org.apache.flink
 flink-streaming-scala_2.11
 ${flink.version}



  
 org.apache.flink
 flink-table-common
 ${flink.version}




 org.apache.flink
 flink-table-api-scala-bridge_2.11
 ${flink.version}





 org.apache.flink
 flink-table-api-scala_2.11
 ${flink.version}








  
  



 org.apache.flink
 flink-connector-kafka_2.11
 ${flink.version}
 provided
  
  
 org.apache.flink
 flink-avro
 ${flink.version}
 provided
  
  
 org.apache.flink
 flink-csv
 ${flink.version}
 provided
  


 org.apache.flink
 flink-json
 ${flink.version}
 provided
  






 org.apache.bahir
 flink-connector-redis_2.11
 1.0
 provided
  



 org.apache.flink
 flink-connector-hive_2.11
 ${flink.version}
 provided
  









 org.apache.flink
 flink-table-api-java
 ${flink.version}
 provided
  



 org.apache.flink
 flink-table-planner_2.11
 ${flink.version}



  
 org.apache.flink
 flink-table-planner-blink_2.11
 ${flink.version}
 provided
  


 org.apache.flink
 flink-sql-connector-kafka_2.11
 ${flink.version}
 provided
  


  
 org.apache.flink
 flink-connector-hbase_2.11
 ${flink.version}
  

Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Rui Li
需要启动一个独立的metastore server,然后hive.metastore.uris配置的是你metastore server的地址。
最简单的场景,在本地启动metastore server命令:hive --service metastore
hive.metastore.uris设置成:thrift://localhost:9083

更详细的metastore使用方法可以参考hive文档:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

On Thu, Jun 18, 2020 at 5:21 PM Sun.Zhu <17626017...@163.com> wrote:

> 对应这种改动还是挺大的,有对应的说明文档吗?
> hive.metastore.uris 这个需要怎么配置,有样例吗?
>
>
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月18日 17:01,Rui Li 写道:
>
> 是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。
>
> On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:
>
>
> Hi
>
> 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:
>
> Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
> allowed. Make sure you have set a valid value for hive.metastore.uris
>
> 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
> metastore 并在conf文件配置 hive.metastore.uris
>
> Best,
> Leonard Xu
>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best regards!
Rui Li


Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 Thread Leonard Xu
Hello

1.7.2是比较老的版本了, 可以考虑下升级新的版本,新的版本都支持你所需的功能的。

1.10.0 && 1.10.1 文档[1],对应的两个参数:

  'connector.write.flush.max-rows' = '5000', -- optional, flush max size 
(includes all append, upsert and delete records), 
 -- 
over this number of records, will flush data. The default value is "5000".
  'connector.write.flush.interval' = '2s', --optional, flush interval 
mills, over this time, asynchronous threads will flush data.
如果使用1.10版本,推荐使用1.10.1版本,1.10.1在1.10.0的基础上修复了一些bug。

社区即将发布的1.11.0 文档[2], 对应的两个参数:
sink.buffer-flush.max-rows  -- The max size of buffered records before 
flush. Can be set to zero to disable it.
sink.buffer-flush.interval  -- The flush interval mills, over this time, 
asynchronous threads will flush data. 
-- Can be set to '0' to disable 
it. Note, 'sink.buffer-flush.max-rows' can be set to '0' 
-- with the flush interval set 
allowing for complete async processing of buffered actions.

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#jdbc-connector
 

 
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options
 

 


> 在 2020年6月18日,17:04,nicygan  写道:
> 
> dear all:
> 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
> 
> 
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>.setDrivername("com.mysql.jdbc.Driver")
>.setDBUrl("jdbc:mysql://localhost:3306/flink")
>.setUsername("root")
>.setPassword("123456")
>.setQuery(sql2)
>.setParameterTypes(types)
> .setBatchSize(1000)
>   .build();
> 
> === 问题 
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
> 
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
> 
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
> 
> thanks



Re: flink1.10.1 SQL 作业 netty报错, 求帮助

2020-06-18 Thread Jark Wu
看起来是一个已知问题: https://issues.apache.org/jira/browse/FLINK-17479

On Wed, 17 Jun 2020 at 11:00, hb <343122...@163.com> wrote:

> flink1.10.1 写的 SQL 作业, 开始运行3个小时正常,  checkpoint也正常.
> 然后,checkpoint失败了, 作业一直卡在RESTARTING 状态不动.
>
> TaskManager 日志:
> 2020-06-16 20:38:16,640 INFO org.apache.kafka.clients.consumer.internals.
> AbstractCoordinator - [Consumer clientId=consumer-11, groupId=] Discovered
> group coordinator 172.16.30.165:9092 (id: 2147483645 rack: null)
> 2020-06-16 23:27:46,026 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time]) -> Calc(select=[((edit_time SUBSTRING 0 SUBSTRING 10)
> CONCAT _UTF-16LE':' CONCAT _UTF-16LE'rt:trace' CONCAT _UTF-16LE':' CONCAT
> _UTF-16LE'item_ids_ordered' CONCAT _UTF-16LE':' CONCAT shop_id) AS
> setKey, item_id AS setValue], where=[(_change_column jsonHasKey _UTF-16LE
> '"pay_time"')]) -> SinkConversionToTuple2 -> Sink: Unnamed (1/2) (5
> c29783b8f7ed8bfb1a7723f5c4216b1).
> 2020-06-16 23:27:46,027 INFO org.apache.flink.runtime.taskmanager.Task -
> Attempting to fail task externally Source: KafkaTableSource(id, order_id,
> order_no, trade_id, bu_id, shop_id, supply_id, sku_id, item_id, item_name,
> item_count, origin_single_item_amount, pay_amount, tax_amount,
> item_actual_amount, customer_amount, sharing_amount, logistic_type,
> logistic_pay_type, logistic_amount, attribute, spu_feature, spec,
> spec_desc, item_picture, promotion_attr, order_status, refund_status,
> biz_id, biz_type, out_attr, bonded_area_id, tags, accept_time, pay_time,
> stock_out_time, delivery_time, end_time, is_deleted, creator, editor,
> create_time, edit_time, _change_column, _old_column, _ddl_field,
> _table_name, _db_name, _op_type, _execute_time) -> 
> SourceConversion(table=[default_catalog.default_database.stream_yt_trade_pt_order_shop_2020052501,
> source: [KafkaTableSource(id, order_id, order_no, trade_id, bu_id,
> shop_id, supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, stock_out_time, delivery_time,
> end_time, is_deleted, creator, editor, create_time, edit_time,
> _change_column, _old_column, _ddl_field, _table_name, _db_name, _op_type,
> _execute_time)]], fields=[id, order_id, order_no, trade_id, bu_id, shop_id,
> supply_id, sku_id, item_id, item_name, item_count,
> origin_single_item_amount, pay_amount, tax_amount, item_actual_amount,
> customer_amount, sharing_amount, logistic_type, logistic_pay_type,
> logistic_amount, attribute, spu_feature, spec, spec_desc, item_picture,
> promotion_attr, order_status, refund_status, biz_id, biz_type, out_attr,
> bonded_area_id, tags, accept_time, pay_time, 

Re: flink sql 窗口场景的问题

2020-06-18 Thread Leonard Xu
Hi, 

窗口输出可以加emit策略,在watermark未触发时提前输出window的中间结果,不过社区目前标注的是experimental的功能,生产环境中应谨慎使用。
table.exec.emit.early-fire.enabled
table.exec.emit.early-fire.delay
可以参考[1]。

Best
Leonard Xu

[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala#L174
 

 



Re: FileInputFormat 使用问题

2020-06-18 Thread john
嗨,找到问题了吗?我也遇到了

> 2020年6月1日 下午2:48,阿华田  写道:
> 
> //初始化任务参数
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> FileInputFormat fileInputFormat = new TextInputFormat(new 
> Path("hdfs://arc/success_fid_flow "));
> fileInputFormat.setNestedFileEnumeration(true);
> //过滤掉条件为true
> fileInputFormat.setFilesFilter(new 
> RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
> DataSet source =env.createInput(fileInputFormat);
> source.output(new HdfsTrainSinktest());



Re:Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 Thread nicygan



请问timeout值是多少?在哪里可设置?



在 2020-06-18 17:43:31,"Benchao Li"  写道:
>我理解现在就是你想要的效果。
>batch-size和timeout两个条件是达到一个就会flush的。
>
>nicygan  于2020年6月18日周四 下午5:05写道:
>
>> dear all:
>>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>>
>>
>> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
>> .setDrivername("com.mysql.jdbc.Driver")
>> .setDBUrl("jdbc:mysql://localhost:3306/flink")
>> .setUsername("root")
>> .setPassword("123456")
>> .setQuery(sql2)
>> .setParameterTypes(types)
>>  .setBatchSize(1000)
>>.build();
>>
>> === 问题 
>> 如果上游数据来源时间是:
>> 10:00 -> 900条
>> 10:10 -> 120条
>> 11:50 -> 1100条
>> 15:00 -> 900条
>>
>> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
>> 10:10 -> 写入1000条,剩20条下次写入
>> 11:50 -> 写入1000条,剩30条下次写入
>> 15:00 -> 写入1000条,剩10条下次写入
>>
>> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
>> 10:10 -> 写入1000条,剩20条下次写入
>> 10:30 -> 写入20条
>> 11:50 -> 写入1000条,剩10条下次写入
>> 12:10 -> 写入10条
>> 15:20 -> 写入900条
>>
>> thanks
>>
>
>
>-- 
>
>Best,
>Benchao Li


Re: flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 Thread Benchao Li
我理解现在就是你想要的效果。
batch-size和timeout两个条件是达到一个就会flush的。

nicygan  于2020年6月18日周四 下午5:05写道:

> dear all:
>  我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。
>
>
> JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("jdbc:mysql://localhost:3306/flink")
> .setUsername("root")
> .setPassword("123456")
> .setQuery(sql2)
> .setParameterTypes(types)
>  .setBatchSize(1000)
>.build();
>
> === 问题 
> 如果上游数据来源时间是:
> 10:00 -> 900条
> 10:10 -> 120条
> 11:50 -> 1100条
> 15:00 -> 900条
>
> JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
> 10:10 -> 写入1000条,剩20条下次写入
> 11:50 -> 写入1000条,剩30条下次写入
> 15:00 -> 写入1000条,剩10条下次写入
>
> 我想要达到等待20分种,不满足batchSize也写入,能否实现?
> 10:10 -> 写入1000条,剩20条下次写入
> 10:30 -> 写入20条
> 11:50 -> 写入1000条,剩10条下次写入
> 12:10 -> 写入10条
> 15:20 -> 写入900条
>
> thanks
>


-- 

Best,
Benchao Li


Re: flink sql 窗口场景的问题

2020-06-18 Thread john
嗨,推荐你使用这个:窗口实用触发器 ContinuousEventTimeTrigger

> 2020年6月3日 下午10:29,Sun.Zhu <17626017...@163.com> 写道:
> 
> hi
> 你是要每条数据都计算当前5分钟内的聚合值吗?如果是这样的话可以考虑使用over window
> 
> 
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
> 
> 
> 在2020年06月3日 02:56,steven chen 写道:
> hi :
> 我现在遇到有这样一个场景,我们需要实时去统计5分和30分的粒度,flink sql 窗口使用了处理时间滑动窗口方式
> 但是都是只有5分结束的时候才能把聚合结果输出,这个不满足我们需求,有没有方式可以直接实时输出结果,比如18:02 的统计+1+1 
> 都能直接落在18:00-18:05的窗口上,并每次+1都能实时输出,而不是等到窗口结束才sink 到mysql .30分钟我同样



回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Sun.Zhu
对应这种改动还是挺大的,有对应的说明文档吗?
hive.metastore.uris 这个需要怎么配置,有样例吗?


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:01,Rui Li 写道:
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:


Hi

在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:

Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
metastore 并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu



--
Best regards!
Rui Li


flink1.7.2-JDBCAppendTableSink,如何按间隔时间写入数据

2020-06-18 Thread nicygan
dear all:
 我想用JDBCAppendTableSink向Mysql写数据,可以设置批量大小,不能设置间隔时间。


JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setBatchSize(1)
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("123456")
.setQuery(sql2)
.setParameterTypes(types)
 .setBatchSize(1000)
   .build();

=== 问题 
如果上游数据来源时间是:
10:00 -> 900条
10:10 -> 120条
11:50 -> 1100条
15:00 -> 900条

JDBCAppendTableSink的数据写入Mysql时间是怎样的? 我的理解是
10:10 -> 写入1000条,剩20条下次写入
11:50 -> 写入1000条,剩30条下次写入
15:00 -> 写入1000条,剩10条下次写入

我想要达到等待20分种,不满足batchSize也写入,能否实现?
10:10 -> 写入1000条,剩20条下次写入
10:30 -> 写入20条
11:50 -> 写入1000条,剩10条下次写入
12:10 -> 写入10条
15:20 -> 写入900条

thanks


sql-client????????????????SUCCEEDED????

2020-06-18 Thread MuChen
hi,


yarn-session:bin/yarn-session.sh -n 5 -jm 1g -tm 4g -s 4 -qu root.flink 
-nm fsql-cli  /dev/null 21 
sql-clientsql??
kafkahive??joinmysql


??succeeded??







ps:kafka
??SUCCEEDED


??

Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Rui Li
是的,embedded模式需要添加额外的jar包,容易导致依赖冲突。而且生产环境中embedded模式也比较少见,所以在1.11中HiveCatalog已经不允许embedded模式了。

On Thu, Jun 18, 2020 at 4:53 PM Leonard Xu  wrote:

>
> Hi
>
> > 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:
> >
> > Caused by: java.lang.IllegalArgumentException: Embedded metastore is not
> allowed. Make sure you have set a valid value for hive.metastore.uris
>
> 错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive
> metastore 并在conf文件配置 hive.metastore.uris
>
> Best,
> Leonard Xu



-- 
Best regards!
Rui Li


Re: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Leonard Xu
 
Hi

> 在 2020年6月18日,16:45,Sun.Zhu <17626017...@163.com> 写道:
> 
> Caused by: java.lang.IllegalArgumentException: Embedded metastore is not 
> allowed. Make sure you have set a valid value for hive.metastore.uris

错误的原因应该是这个,flink 集成 hive 时 不支持embedded metastore的,你的 hive 需要起一个hive metastore 
并在conf文件配置 hive.metastore.uris

Best,
Leonard Xu

回复: sqlclient集成hiveCatalog查询kafka表问题

2020-06-18 Thread Sun.Zhu
Hi,Rui Li
我把connector的包也替换成1.11的了,结果sql-cli启动报错
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could 
not create execution context.
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:818)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:230)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalArgumentException: Embedded metastore is not 
allowed. Make sure you have set a valid value for hive.metastore.uris
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:171)
at org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:157)
at 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:84)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:366)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$6(ExecutionContext.java:565)
at java.util.HashMap.forEach(HashMap.java:1289)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:564)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:252)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:563)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:512)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:171)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:124)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:807)


hive catalog的配置和1.10.1一样,如下:
catalogs: #[] # empty list
# A typical catalog definition looks like:
  - name: myhive
type: hive
hive-conf-dir: /Users/zhushang/Desktop/software/apache-hive-2.2.0-bin/conf
hive-version: 2.2.0
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 15:46,Rui Li 写道:
第三方包指的是flink-connector-hive这种吗?这些包在build的时候也会打出来的,只不过没有加到flink-dist里。到对应的module里找一下,比如flink-connector-hive会在/flink-connectors/flink-connector-hive/target下面。

On Thu, Jun 18, 2020 at 12:22 PM Jark Wu  wrote:

你可以拿 release-1.11 分支: https://github.com/apache/flink/tree/release-1.11/
自己编译一下:mvn clean install -DskipTests
在 build-target 下就是打出来的 1.11 的分发包内容。

Best,
Jark



On Wed, 17 Jun 2020 at 15:30, Sun.Zhu <17626017...@163.com> wrote:



是的,除了编译出来1.11的包之外,第三方包都拿的1.10.1的版本,但是对应的1.11还没有release吧,从哪里获取呢


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月17日 13:25,Rui Li 写道:
是说把1.10.1的hive connector跟1.11的flink一起用么?如果这样用是肯定有问题的。可以把版本都统一成1.11试试。

On Wed, Jun 17, 2020 at 12:18 PM Sun.Zhu <17626017...@163.com> wrote:

Sqlcli上得报错就上面这点,没有更多得信息了,或者从哪些log里可以看到更多信息




在2020年06月17日 10:27,Benchao Li 写道:
目前这个缺少的依赖是在flink-table-runtime-blink module的,现在这些重构到了flink-table-common
module了。
如果只是connector、format这些用老的版本,应该是没有问题的。
你可以把更详细的报错信息发一下吗?看一下具体是哪个模块还在依赖老版本的flink-table-runtime-blink

Sun.Zhu <17626017...@163.com> 于2020年6月17日周三 上午12:49写道:

是的 除了1.11 编译出来的包之外依赖的包,比如connector的、hivecatalog需要依赖的包,由于1.11
还没有release所以就用的1.10.1版本的,上面两个问题在1.10.1版本下是没有的,升级了1.11报了不知道什么原因,缺少依赖吗?




在2020年06月16日 18:38,Benchao Li 写道:
1.11中对底层数据结构做了一些重构,所以你不可以直接把1.10的jar包拿到1.11里面使用的。
你可以直接使用1.11里面编译出来的jar包来跑应该是没有问题的。

Sun.Zhu <17626017...@163.com> 于2020年6月16日周二 下午6:11写道:

我编译了1.11包
在sql-cli下查询hive的表报如下错误:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError:
org/apache/flink/table/dataformat/BaseRow


查注册的kafka表报:
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.table.dataformat.BaseRow


依赖包是从1.10.1下面拷贝的
| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 11:44,Sun.Zhu<17626017...@163.com> 写道:
Got it!
Thx,junbao


| |
Sun.Zhu
|
|
17626017...@163.com
|
签名由网易邮箱大师定制


在2020年06月13日 09:32,zhangjunbao 写道:
1.10.x版本,在hivecatalog下,建表时用proctime as PROCTIME()应该是有bug的,
https://issues.apache.org/jira/browse/FLINK-17189 <
https://issues.apache.org/jira/browse/FLINK-17189>

Best,
Junbao Zhang

2020年6月13日 上午12:31,Sun.Zhu <17626017...@163.com> 写道:

hi,all
在用sql client集成hiveCatalog时,在hiveCatalog中注册了一个kafka的table
ddl如下:
|
CREATETABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列
WATERMARK FOR ts as ts - INTERVAL'5'SECOND-- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector.type' = 'kafka',  -- 使用 kafka 

回复:env.readFile 递归监控目录 如何清理状态(历史目录)

2020-06-18 Thread star
感谢您的建议!如果我把hdfs目录删掉,flink里对应的状态也会清掉吗?



发自我的iPhone


-- 原始邮件 --
发件人: Jark Wu https://issues.apache.org/jira/browse/FLINK-18357;
我的一个初步的想法是,是否可以有一个 inactive-interval 去标记一个子目录已经不会有新文件产生了,这样 checkpoint
就不用跟踪这个子目录下的所有文件。


Best,
Jark

On Wed, 17 Jun 2020 at 14:04, star <3149768...@qq.com wrote:

 nbsp;


 env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)


 上面是一个监控目录里的数据的source
 format设置成递归监控一个父目录A , A下面是日期目录 ,如:


 A/20200101/
 A/20200102/
 A/20200103/
 ...
 ...



 
随着时间的增加,比如到6月需要监控近200个目录,每个目录又有500个文件,每次ck需要同步的状态就是200*500个文件的消费的offset,这样经常ck超时,


 请问这种可以清理历史状态吗,比如业务上知乎有近7天的目录才会有数据更新,历史的就可以不监控了。

Re: Unable to run flink job in dataproc cluster with jobmanager provided

2020-06-18 Thread Sourabh Mehta
Hi ,
application is using 1.10.0 but cluster is setup on 1.9.0.

Yes I do have access. please find below starting logs from cluster


2020-06-17 11:28:18,989 INFO
 org.apache.shaded.flink.table.module.ModuleManager- Got
FunctionDefinition equals from module core
2020-06-17 11:28:20,538 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, localhost
2020-06-17 11:28:20,538 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2020-06-17 11:28:20,538 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.size, 1024m
2020-06-17 11:28:20,538 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.size, 1024m
2020-06-17 11:28:20,538 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-17 11:28:20,538 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 1
2020-06-17 11:28:20,539 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-06-17 11:28:20,539 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, cluster-flink-poc-m
2020-06-17 11:28:20,539 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.mb, 12288
2020-06-17 11:28:20,539 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.heap.mb, 12288
2020-06-17 11:28:20,540 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.numberOfTaskSlots, 4
2020-06-17 11:28:20,540 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: parallelism.default, 28
2020-06-17 11:28:20,540 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: taskmanager.network.numberOfBuffers, 2048
2020-06-17 11:28:20,540 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: fs.hdfs.hadoopconf, /etc/hadoop/conf
2020-06-17 11:28:20,550 INFO
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils  -
The configuration option Key: 'taskmanager.cpu.cores' , default: null
(fallback keys: []) required for local execution is not set, setting it to
its default value 1.7976931348623157E308
2020-06-17 11:28:20,552 INFO
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils  -
The configuration option Key: 'taskmanager.memory.task.heap.size' ,
default: null (fallback keys: []) required for local execution is not set,
setting it to its default value 9223372036854775807 bytes
2020-06-17 11:28:20,552 INFO
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils  -
The configuration option Key: 'taskmanager.memory.task.off-heap.size' ,
default: 0 bytes (fallback keys: []) required for local execution is not
set, setting it to its default value 9223372036854775807 bytes
2020-06-17 11:28:20,552 INFO
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils  -
The configuration option Key: 'taskmanager.memory.network.min' , default:
64 mb (fallback keys: [{key=taskmanager.network.memory.min,
isDeprecated=true}]) required for local execution is not set, setting it to
its default value 64 mb
2020-06-17 11:28:20,553 INFO
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils  -
The configuration option Key: 'taskmanager.memory.network.max' , default: 1
gb (fallback keys: [{key=taskmanager.network.memory.max,
isDeprecated=true}]) required for local execution is not set, setting it to
its default value 64 mb
2020-06-17 11:28:20,553 INFO
 org.apache.shaded.flink.runtime.taskexecutor.TaskExecutorResourceUtils  -
The configuration option Key: 'taskmanager.memory.managed.size' , default:
null (fallback keys: [{key=taskmanager.memory.size, isDeprecated=true}])
required for local execution is not set, setting it to its default value
128 mb
2020-06-17 11:28:20,558 INFO
 org.apache.shaded.flink.runtime.minicluster.MiniCluster   - Starting
Flink Mini Cluster
2020-06-17 11:28:20,561 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.address, localhost
2020-06-17 11:28:20,561 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.rpc.port, 6123
2020-06-17 11:28:20,561 INFO
 org.apache.shaded.flink.configuration.GlobalConfiguration - Loading
configuration property: jobmanager.heap.size, 1024m

Re: env.readFile 递归监控目录 如何清理状态(历史目录)

2020-06-18 Thread Jark Wu
Hi,

我觉得这个也许可以先从业务上解决。比如你可以有另一个作业定期去 HDFS 上把过期的数据清理掉(比如半个月前的?)。

另外,我也开了一个 issue 去跟进这个问题,看看社区里面对这块比较熟的同学有没有更好的建议。FLINK-18357

我的一个初步的想法是,是否可以有一个 inactive-interval 去标记一个子目录已经不会有新文件产生了,这样 checkpoint
就不用跟踪这个子目录下的所有文件。


Best,
Jark

On Wed, 17 Jun 2020 at 14:04, star <3149768...@qq.com> wrote:

> 
>
>
> env.readFile(format,path, FileProcessingMode.PROCESS_CONTINUOUSLY, 6)
>
>
> 上面是一个监控目录里的数据的source
> format设置成递归监控一个父目录A , A下面是日期目录 ,如:
>
>
> A/20200101/
> A/20200102/
> A/20200103/
> ...
> ...
>
>
>
> 随着时间的增加,比如到6月需要监控近200个目录,每个目录又有500个文件,每次ck需要同步的状态就是200*500个文件的消费的offset,这样经常ck超时,
>
>
> 请问这种可以清理历史状态吗,比如业务上知乎有近7天的目录才会有数据更新,历史的就可以不监控了。


Re: Flink kerberos环境下多个keytab认证问题

2020-06-18 Thread john
不必写在配置文件里,在提交的时候使用 -yD 动态指定参数。-yD use value for given 
property。这个参数可以多个。
另外在提交的时候,可以使用klist命令看下默认的principal是哪个。
使用:kinit -kt .keytab  可以更改default principal 也就是切换用户。你可以试下。

> 2020年6月12日 上午11:36,zhangjunjie1...@163.com 写道:
> 
>Flink1.9环境下,搭建Flink on yarn平台,用户之间实现租户/资源隔离,在kerberos环境下运行Flink 
> perjob模式,需要在Flink-conf.yaml中添加:
>security.kerberos.login.use-ticket-cache: true
>security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
>security.kerberos.login.principal: p_zhangjun...@local.com 
>但是如果多个用户使用Flink环境资源,比如说除了p_zhangjunjie,还是p_wanglin,然后我在Flink-conf.yaml中添加:
>security.kerberos.login.use-ticket-cache: true
>security.kerberos.login.keytab: /home/flink/p_zhangjunjie.keytab
>security.kerberos.login.principal: p_zhangjun...@local.com 
> 
>security.kerberos.login.use-ticket-cache: true
>security.kerberos.login.keytab: /home/flink/p_wanglin.keytab
>security.kerberos.login.principal: p_wang...@local.com 
>但是只有最下面的一个(比如p_wanglin)生效。使用p_zhangjunjie就报错:那如何实现多个keytab用户同时生效呢?
> 
> 谢谢!麻烦大家帮忙解决一下了。哪怕提供个思路都可以。
> 
> 
> 
> 
> 
> zhangjunjie1...@163.com



Re: Flink ML

2020-06-18 Thread Jark Wu
Currently, FLIP-39 is mainly driven by Becket and his team. I'm including
him, maybe he can answer your question.

Best,
Jark

On Wed, 17 Jun 2020 at 23:00, Piotr Nowojski  wrote:

> Hi,
>
> It looks like FLIP-39 is only partially implemented as for now [1], so I’m
> not sure which features are already done. I’m including Shaoxuan Wang in
> this thread, maybe he will be able to better answer your question.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-12470
>
> On 16 Jun 2020, at 14:55, Dimitris Vogiatzidakis <
> dimitrisvogiatzida...@gmail.com> wrote:
>
> Hello,
> I'm a cs student currently working on my Bachelor's thesis. I've used
> Flink to extract features out of some datasets, and I would like to use
> them together with another dataset of (1,0) (Node exists or doesn't) to
> perform a logistic regresssion. I have found that FLIP-39 has been accepted
> and it is running in version 1.10.0 that I also currently use, but I'm
> having trouble implementing it. Are there any java examples currently up
> and running? Or if you can propose a different way to perform the task?
> Thank you.
>
> -Dimitris Vogiatzidakis
>
>
>


Re: Flink plugin File System for GCS

2020-06-18 Thread Yang Wang
Hi Alex,

Build a fat jar is a good practice for flink filesystem plugin, just
like flink-s3-fs-hadoop,
flink-s3-fs-presto, flink-azure-fs-hadoop and flink-oss-fs-hadoop. All the
provided filesystem
plugins are self-contained, which means you need to bundle the hadoop in
your fat jar.

The reason why you fat could not work probably is you do not shade in the
adapter classes
"org.apache.flink.runtime.fs.hdfs" and "org.apache.flink.runtime.util".[1]
This will forces plugin classloader always load them from your fat, not
from flink-dist-*.jar.

Could you add the force shading and have another try?

[1].
https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-hadoop/pom.xml#L187


Best,
Yang

Alexander Filipchik  于2020年6月18日周四 下午12:27写道:

> Hello,
> I'm trying to implement a flink native FS for GCS which can be used with a
> streaming file sink. I used S3 one as a reference and made it work locally.
> however, it fails to load when I deploy it to the cluster. If I put hadoop
> in the fat jar I get:
>
> Caused by: java.lang.LinkageError: loader constraint violation: loader
> (instance of org/apache/flink/core/plugin/PluginLoader$PluginClassLoader)
> previously initiated loading for a different type with name
> "org/apache/hadoop/conf/Configuration"
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at
> org.apache.flink.core.plugin.PluginLoader$PluginClassLoader.loadClass(PluginLoader.java:149)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at
> com.css.flink.fs.gcs.FlinkGcsFileSystemFactory.createInitializedGcsFS(FlinkGcsFileSystemFactory.java:63)
>
> If i remove hadoop from fat jar but add hadoop uber to lib folder I get:
> java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
>
> if I remove hadoop from fat jar and put
> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar into libs, I'm getting:
> java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem
>
> I think it is some issue with classloaders, but not sure what exactly
> causes it. It looks like some classes are loaded from lib and some are not.
> Any advice?
>
> Thank you,Alex
>


Re: Convert flink table with field of type RAW to datastream

2020-06-18 Thread Jark Wu
Flink SQL/Table requires to know the field data types explicitly. Maybe you
can apply a MapFunction before `toTable` to convert/normalize the data and
type.

Best,
Jark

On Thu, 18 Jun 2020 at 14:12, YI  wrote:

> Hi Jark,
>
> Thank you for your suggestion. My current problem is that there are quite
> a few data types. All these data types are defined upstream which I have no
> control.
> I don't think I can easily change the type information of a specific
> field. Can I? Things become nasty when there are so many `java.util.Date` I
> need to change.
>
> The reason I want to use flink table is that it allows me to easily join
> several tables. As an alternative, I think I can use stream join operator.
> My only complaint is that it become tedious when I want to join more than
> once. I think I need to define all the intermediate data types.
>
> Best,
> Yi
>
>
>
> ‐‐‐ Original Message ‐‐‐
> On Thursday, June 18, 2020 12:11 PM, Jark Wu  wrote:
>
> Hi YI,
>
> Flink doesn't have a TypeInformation for `java.util.Date`, but
> only SqlTimeTypeInfo.DATE for `java.sql.Date`.
> That's why the TypeInformation.of(java.util.Date) is being recognized as a
> RAW type.
>
> To resolve your problem, I think in `TypeInformation.of(..)` you should
> use a concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`,
> `java.sql.Date`, `java.sql.Time`.
>
> Best,
> Jark
>
> On Thu, 18 Jun 2020 at 10:32, YI  wrote:
>
>> Hi all,
>>
>> I am using flink to process external data. The source format is json, and
>> the underlying data types are defined in a external library.
>> I generated table schema with `TableSchema.fromTypeInfo` and
>> `TypeInformation.of[_]`. From what I read, this method is deprecated.
>> But I didn't find any alternatives. Manually tweaking table schema is not
>> viable as there are simply too many types.
>>
>> One of the field in the source type is `java.util.Date`. I tried to
>> convert the obtained table to a datastream with Table.toAppendStream.
>> When I ran
>> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`,
>> the following exception occurred.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Type is not supported: Date
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
>> at
>> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>> at
>> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
>> at
>> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
>> at
>> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
>> at
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
>> at
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
>> at
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
>> at
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
>> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
>> at
>> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
>> at
>> 

Re: Convert flink table with field of type RAW to datastream

2020-06-18 Thread YI
Hi Jark,

Thank you for your suggestion. My current problem is that there are quite a few 
data types. All these data types are defined upstream which I have no control.
I don't think I can easily change the type information of a specific field. Can 
I? Things become nasty when there are so many `java.util.Date` I need to change.

The reason I want to use flink table is that it allows me to easily join 
several tables. As an alternative, I think I can use stream join operator.
My only complaint is that it become tedious when I want to join more than once. 
I think I need to define all the intermediate data types.

Best,
Yi

‐‐‐ Original Message ‐‐‐
On Thursday, June 18, 2020 12:11 PM, Jark Wu  wrote:

> Hi YI,
>
> Flink doesn't have a TypeInformation for `java.util.Date`, but only 
> SqlTimeTypeInfo.DATE for `java.sql.Date`.
> That's why the TypeInformation.of(java.util.Date) is being recognized as a 
> RAW type.
>
> To resolve your problem, I think in `TypeInformation.of(..)` you should use a 
> concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`, 
> `java.sql.Date`, `java.sql.Time`.
>
> Best,
> Jark
>
> On Thu, 18 Jun 2020 at 10:32, YI  wrote:
>
>> Hi all,
>>
>> I am using flink to process external data. The source format is json, and 
>> the underlying data types are defined in a external library.
>> I generated table schema with `TableSchema.fromTypeInfo` and 
>> `TypeInformation.of[_]`. From what I read, this method is deprecated.
>> But I didn't find any alternatives. Manually tweaking table schema is not 
>> viable as there are simply too many types.
>>
>> One of the field in the source type is `java.util.Date`. I tried to convert 
>> the obtained table to a datastream with Table.toAppendStream.
>> When I ran 
>> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`,
>>  the following exception occurred.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException: Type 
>> is not supported: Date
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
>> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
>> at 
>> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
>> at 
>> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
>> at 
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
>> at 
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
>> at 
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at 
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
>> at 
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
>> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at 
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
>> at 
>> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
>> at 
>> org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
>> at 
>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
>> at 
>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
>> at 
>>