Container is running beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory used; 7.0 GB of 25 GB virtual memory used. Killing container.

2021-03-30 Thread admin
java.lang.Exception: Container 
[pid=17248,containerID=container_1597847003686_12235_01_001336] is running 
beyond physical memory limits. Current usage: 5.0 GB of 5 GB physical memory 
used; 7.0 GB of 25 GB virtual memory used. Killing container.
Dump of the process-tree for container_1597847003686_12235_01_001336 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 17283 17248 17248 17248 (java) 1025867 190314 7372083200 1311496 
/usr/local/jdk1.8/bin/java -Xmx2147483611 -Xms2147483611 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -server 
-XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=75 -XX:ParallelGCThreads=4 
-XX:+AlwaysPreTouch -XX:NewRatio=1 -DjobName=fastmidu-deeplink-tuid-20200203 
-Dlog.file=/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1825361124b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=2013265883b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=di-h4-dn-134.h.ab1.qttsite.net -Dweb.port=0 
-Dweb.tmpdir=/tmp/flink-web-f63d543b-a75a-4dc4-be93-979eebd8062d 
-Djobmanager.rpc.port=43423 -Drest.address=di-h4-dn-134.h.ab1.qttsite.net 
|- 17248 17246 17248 17248 (bash) 0 0 116015104 353 /bin/bash -c 
/usr/local/jdk1.8/bin/java -Xmx2147483611 -Xms2147483611 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 -server 
-XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly 
-XX:CMSInitiatingOccupancyFraction=75 -XX:ParallelGCThreads=4 
-XX:+AlwaysPreTouch -XX:NewRatio=1 -DjobName=fastmidu-deeplink-tuid-20200203 
-Dlog.file=/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1825361124b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=2013265883b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='di-h4-dn-134.h.ab1.qttsite.net' -Dweb.port='0' 
-Dweb.tmpdir='/tmp/flink-web-f63d543b-a75a-4dc4-be93-979eebd8062d' 
-Djobmanager.rpc.port='43423' -Drest.address='di-h4-dn-134.h.ab1.qttsite.net' 
1> 
/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.out
 2> 
/data1/yarn/containers/application_1597847003686_12235/container_1597847003686_12235_01_001336/taskmanager.err
 

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Hi :

Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-23 Thread admin
Hi,
Hive 自动添加分区依赖于分区提交策略 metastore,所以得添加policy配置才能生效

> 2020年12月23日 上午9:27,kingdomad  写道:
> 
> 是的。开启了checkpoint。
> 消费kafka,用tableEnv把stream注册成TemporaryView。
> 然后执行sql写入到hive的表中。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> kingdomad
> 
> 
> 
> 
> 
> 
> 
> 在 2020-12-23 09:22:48,"范瑞" <836961...@qq.com> 写道:
>> Hello
>> 
>> 
>> 请问是使用 Sql吧?开启cp了吗?
>> 
>> 
>> 
>> ---原始邮件---
>> 发件人: "kingdomad"> 发送时间: 2020年12月23日(周三) 上午9:17
>> 收件人: "user-zh"> 主题: Re:Re: flink1.11.2写hive分区表,hive识别不到分区
>> 
>> 
>> 分区用的是记录中的字段,没有用到processing time或者event time去生成分区。
>> 发现只要给hive的表加上以下这三个属性就可以马上提交分区到metastore了。
>> 'sink.partition-commit.trigger'='process-time'
>> 'sink.partition-commit.delay'='0s'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> 
>> kingdomad
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-12-21 23:27:49,"赵一旦" > 即使不是flink写入,其他方式写入也需要这样做的哈。
>> 
>> r pp > 
>>  程序中,创建表后,执行命令。
>> 
>>  kingdomad > 
>>  
>>  
>> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
>>   需要执行msck repair table修复分区表后,hive才能读取到数据。
>>   求助大佬,要如何解决。
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>  
>>   --
>>  
>>   kingdomad
>>  
>>  
>> 



sink.rolling-policy.file-size不生效

2020-12-03 Thread admin
Hi all,
使用flink 1.11.1的filesystem 
connector,配置了sink.rolling-policy.file-size=50MB,结果依然有100+M的文件
DDL如下:Checkpoint间隔1min
CREATE TABLE cpc_bd_recall_log_hdfs (
log_timestamp BIGINT,
ip STRING,
`raw` STRING,
`day` STRING, `hour` STRING,`minute` STRING
) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
'connector'='filesystem',
'path'='hdfs://xxx/test.db/cpc_bd_recall_log_hdfs',
'format'='parquet',
'parquet.compression'='SNAPPY',
'sink.rolling-policy.file-size' = '50MB',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.delay'='60s'
);


Hdfs文件如下:

  0 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/_SUCCESS
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-0-2500
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-0-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-1-2499
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-1-2500
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-10-2501
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-10-2502
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-11-2500
-rw-r--r--   3 hadoop hadoop122.2 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-11-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-12-2500
-rw-r--r--   3 hadoop hadoop122.2 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-12-2501
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-13-2499
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-13-2500
-rw-r--r--   3 hadoop hadoop 31.6 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-14-2500
-rw-r--r--   3 hadoop hadoop122.1 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-14-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-15-2498
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-15-2499
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-16-2501
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-16-2502
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-17-2500
-rw-r--r--   3 hadoop hadoop122.5 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-17-2501
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-18-2500
-rw-r--r--   3 hadoop hadoop121.7 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-18-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-19-2501
-rw-r--r--   3 hadoop hadoop121.7 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-19-2502
-rw-r--r--   3 hadoop hadoop 31.6 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-2-2499
-rw-r--r--   3 hadoop hadoop

Re: 使用 StreamingFileSink后 checkpoint状态中的数据如何hive读取

2020-11-30 Thread admin
hi,
你需要使用oncheckpoint的policy,这样在每次Checkpoint时会滚动文件

> 2020年11月30日 下午4:57,liliang <904716...@qq.com> 写道:
> 
> 本人使用的StreamingFileSink将数据按照行保存到hdfs中
>  StreamingFileSink streamingFileSink = StreamingFileSink.
>forRowFormat(new Path(path), new
> SimpleStringEncoder("UTF-8"))
>.withBucketAssigner(bucketAssigner)
>.withRollingPolicy(
>DefaultRollingPolicy.builder()
> 
> .withRolloverInterval(TimeUnit.HOURS.toMillis(1))
> 
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(30)) 
>.withMaxPartSize(1024 * 1024 * 1024)
>.build())
>.withOutputFileConfig(
>OutputFileConfig.builder()
>.withPartSuffix(partSuffix)
>.build()
>)
>.build();
> 配置如上,checkpoint的配置是10分钟一次,现在有个疑惑想要问下,现在hdfs上文件只是在半个小时都是未完成状态,
> 如 .part-0-11606723036.inprogress.5b46f31b-8289-44e9-ae26-997f3e479446
> 这种的处于
> inprocress状态,但是我这checkpoint是10分钟一次,如果我的任务在29分钟挂了,那么hdfs上这个文件就肯定不是FINISHED状态,那么那20分钟的数据我这应该怎么处理.
> 我这现在按照默认的处理中,hive对于inprogress的数据是直接过滤掉的,我这把文件改成正常的名称是能读取到
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 发现flinksql写hive比写hdfs慢很多

2020-11-26 Thread admin
既然性能差异这么大,所以为什么不默认使用BulkWriter,而使用MR writer呢

> 2020年11月26日 下午7:50,Leonard Xu  写道:
> 
> 
> Hi, admin
> 
> 谢谢验证,
>> 在 2020年11月26日,17:43,admin <17626017...@163.com> 写道:
>> 
>> 默认true的情况下 两个任务同时写30分钟,写hive的任务就已经落后了3分钟
> 
> 此时,写hive用MR writer,写HDFS只支持Flink BulkWriter, 如果单独测试sink的话,Flink BulkWriter 
> 应该不止10%的性能提升。
> 
>> false的情况,两个写30多分钟,差异不大
> 
> false时,两个作业都用 Flink BulkWriter,差异是不大的。
> 
> 另外 1.11分支上的这个issue应该在1.11.3里修复。
> 
> 祝好,
> Leonard



Re: 发现flinksql写hive比写hdfs慢很多

2020-11-26 Thread admin
Hi,Leonard
我将这个issue的改动合到我的代码里,同时看到有邮件列表里提到 table.exec.hive.fallback-mapred-writer 
这个配置对写入速度也有影响,
所以我分别基于true或false做了测试。
结果是:
默认true的情况下 两个任务同时写30分钟,写hive的任务就已经落后了3分钟
false的情况,两个写30多分钟,差异不大
所以使用MR writer和flink native writer在性能上确实有很大差异

> 2020年11月26日 下午5:32,Leonard Xu  写道:
> 
> Hi, admin
> 结合这个 issue 和你的对比结果,
> 我觉得应该是这个bug,这个问题在最新的分支已经修复,今天社区cut branch了,你可以帮忙在1.12的分支或master的分支上验证下吗?
> 
> 祝好,
> Leonard
> [1] https://github.com/apache/flink/tree/release-1.12 
> <https://github.com/apache/flink/tree/release-1.12>
> [2] https://github.com/apache/flink/tree/master 
> <https://github.com/apache/flink/tree/master> 
> 
>> 在 2020年11月26日,15:34,admin <17626017...@163.com> 写道:
>> 
>> 补充一下 我的flink版本是1.11.1
>> 翻了下邮件列表,有个https://issues.apache.org/jira/browse/FLINK-19121 
>> <https://issues.apache.org/jira/browse/FLINK-19121> 性能问题 
>> <https://issues.apache.org/jira/browse/FLINK-19121%E6%80%A7%E8%83%BD%E9%97%AE%E9%A2%98>,不知道是否跟这个有关
>> 
>>> 2020年11月26日 上午11:49,admin <17626017...@163.com> 写道:
>>> 
>>> Hi,all
>>> 两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没
>>> 附上对应ddl
>>> hive:
>>> CREATE EXTERNAL TABLE hive_table (
>>>  log_timestamp BIGINT,
>>>  ip STRING,
>>>  `raw` STRING
>>> ) PARTITIONED BY (`day` STRING, `hour` STRING,`minute` STRING) STORED AS 
>>> PARQUET
>>> TBLPROPERTIES (
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file',
>>>  'sink.partition-commit.success-file.name' = '_SUCCESS'
>>> );
>>> 
>>> Hdfs:
>>> 
>>> CREATE TABLE hdfs_table (
>>>  log_timestamp BIGINT,
>>>  ip STRING,
>>>  `raw` STRING,
>>>  `day` STRING, `hour` STRING,`minute` STRING
>>> ) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
>>>  'connector'='filesystem',
>>>  'path'='hdfs://xx/test.db/hdfs_table',
>>>  'format'='parquet',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file’,
>>>  'sink.partition-commit.success-file.name' = '_SUCCESS'
>>> );
>>> 
>>> 
>>> 实际hdfs文件对比:
>>> 
>>> -rw-r--r--   3 hadoop hadoop1514862 2020-11-26 09:26 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824
>>> -rw-r--r--   3 hadoop hadoop   10798011 2020-11-26 09:34 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830
>>> -rw-r--r--   3 hadoop hadoop4002618 2020-11-26 09:35 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831
>>> -rw-r--r--   3 hadoop hadoop8057522 2020-11-26 09:51 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844
>>> -rw-r--r--   3 hadoop hadoop6675744 2020-11-26 09:52 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845
>>> -rw-r--r--   3 hadoop hadoop4062571 2020-11-26 09:51 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844
>>> -rw-r--r--   3 hadoop hadoop   10247973 2020-11-26 09:52 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845
>>> -rw-r--r--   3 hadoop hadoop 483029 2020-11-26 09:53 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846
>>> -rw-r--r--   3 hadoop hadoop9440221 2020-11-26 09:16 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816
>>> -rw-r--r--   3 hadoop hadoop5346956 2020-11-26 09:17 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817
>>> -rw-r--r--   3 hadoop hadoop4940718 2020-11-26 09:51 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844
>>> -rw-r--r--   3 hadoop hadoop9687410 2020-11-26 09:52 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845
>>> -rw-r--r--   3 hadoop hadoop  51998 2020-11-26 09:53 
>>> hdfs://xxx/test.db/hive_table/day=2020-11-

Re: 发现flinksql写hive比写hdfs慢很多

2020-11-25 Thread admin
补充一下 我的flink版本是1.11.1
翻了下邮件列表,有个https://issues.apache.org/jira/browse/FLINK-19121 
<https://issues.apache.org/jira/browse/FLINK-19121> 性能问题 
<https://issues.apache.org/jira/browse/FLINK-19121%E6%80%A7%E8%83%BD%E9%97%AE%E9%A2%98>,不知道是否跟这个有关

> 2020年11月26日 上午11:49,admin <17626017...@163.com> 写道:
> 
> Hi,all
> 两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没
> 附上对应ddl
> hive:
> CREATE EXTERNAL TABLE hive_table (
>log_timestamp BIGINT,
>ip STRING,
>`raw` STRING
> ) PARTITIONED BY (`day` STRING, `hour` STRING,`minute` STRING) STORED AS 
> PARQUET
> TBLPROPERTIES (
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file',
>'sink.partition-commit.success-file.name' = '_SUCCESS'
> );
> 
> Hdfs:
> 
> CREATE TABLE hdfs_table (
>log_timestamp BIGINT,
>ip STRING,
>`raw` STRING,
>`day` STRING, `hour` STRING,`minute` STRING
> ) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
>'connector'='filesystem',
>'path'='hdfs://xx/test.db/hdfs_table',
>'format'='parquet',
>'parquet.compression'='SNAPPY',
>'sink.partition-commit.policy.kind' = 'success-file’,
>'sink.partition-commit.success-file.name' = '_SUCCESS'
> );
> 
> 
> 实际hdfs文件对比:
> 
> -rw-r--r--   3 hadoop hadoop1514862 2020-11-26 09:26 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824
> -rw-r--r--   3 hadoop hadoop   10798011 2020-11-26 09:34 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830
> -rw-r--r--   3 hadoop hadoop4002618 2020-11-26 09:35 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831
> -rw-r--r--   3 hadoop hadoop8057522 2020-11-26 09:51 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844
> -rw-r--r--   3 hadoop hadoop6675744 2020-11-26 09:52 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845
> -rw-r--r--   3 hadoop hadoop4062571 2020-11-26 09:51 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844
> -rw-r--r--   3 hadoop hadoop   10247973 2020-11-26 09:52 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845
> -rw-r--r--   3 hadoop hadoop 483029 2020-11-26 09:53 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846
> -rw-r--r--   3 hadoop hadoop9440221 2020-11-26 09:16 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816
> -rw-r--r--   3 hadoop hadoop5346956 2020-11-26 09:17 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817
> -rw-r--r--   3 hadoop hadoop4940718 2020-11-26 09:51 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844
> -rw-r--r--   3 hadoop hadoop9687410 2020-11-26 09:52 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845
> -rw-r--r--   3 hadoop hadoop  51998 2020-11-26 09:53 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-846
> -rw-r--r--   3 hadoop hadoop   3518 2020-11-26 09:37 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-833
> -rw-r--r--   3 hadoop hadoop   13801987 2020-11-26 09:39 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-834
> -rw-r--r--   3 hadoop hadoop 963288 2020-11-26 09:40 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-835
> -rw-r--r--   3 hadoop hadoop6036601 2020-11-26 09:27 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-825
> -rw-r--r--   3 hadoop hadoop8864235 2020-11-26 09:29 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-826
> -rw-r--r--   3 hadoop hadoop   10865872 2020-11-26 09:37 
> hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-833
> -rw-r--r--   3 hadoop hadoop4031077 2020-11-26 09:39 
> hdfs://xxx/test.db/hive_table/day=2020-11-26

发现flinksql写hive比写hdfs慢很多

2020-11-25 Thread admin
Hi,all
两个job,都从同一个kafka读数据,一份写入hdfs,一份写入hive,都是分钟分区,并发都是200。运行一段时间后发现写hive要落后hdfs很多,而且hive任务对应的hdfs路径下,某一分区内的文件甚至跨度2个小时之久。大家遇到过这种情况没
附上对应ddl
hive:
CREATE EXTERNAL TABLE hive_table (
log_timestamp BIGINT,
ip STRING,
`raw` STRING
) PARTITIONED BY (`day` STRING, `hour` STRING,`minute` STRING) STORED AS PARQUET
TBLPROPERTIES (
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.success-file.name' = '_SUCCESS'
);

Hdfs:

CREATE TABLE hdfs_table (
log_timestamp BIGINT,
ip STRING,
`raw` STRING,
`day` STRING, `hour` STRING,`minute` STRING
) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
'connector'='filesystem',
'path'='hdfs://xx/test.db/hdfs_table',
'format'='parquet',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file’,
'sink.partition-commit.success-file.name' = '_SUCCESS'
);


实际hdfs文件对比:

-rw-r--r--   3 hadoop hadoop1514862 2020-11-26 09:26 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-150-824
-rw-r--r--   3 hadoop hadoop   10798011 2020-11-26 09:34 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-830
-rw-r--r--   3 hadoop hadoop4002618 2020-11-26 09:35 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-151-831
-rw-r--r--   3 hadoop hadoop8057522 2020-11-26 09:51 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-844
-rw-r--r--   3 hadoop hadoop6675744 2020-11-26 09:52 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-152-845
-rw-r--r--   3 hadoop hadoop4062571 2020-11-26 09:51 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-844
-rw-r--r--   3 hadoop hadoop   10247973 2020-11-26 09:52 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-845
-rw-r--r--   3 hadoop hadoop 483029 2020-11-26 09:53 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-153-846
-rw-r--r--   3 hadoop hadoop9440221 2020-11-26 09:16 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-816
-rw-r--r--   3 hadoop hadoop5346956 2020-11-26 09:17 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-154-817
-rw-r--r--   3 hadoop hadoop4940718 2020-11-26 09:51 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-844
-rw-r--r--   3 hadoop hadoop9687410 2020-11-26 09:52 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-845
-rw-r--r--   3 hadoop hadoop  51998 2020-11-26 09:53 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-155-846
-rw-r--r--   3 hadoop hadoop   3518 2020-11-26 09:37 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-833
-rw-r--r--   3 hadoop hadoop   13801987 2020-11-26 09:39 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-834
-rw-r--r--   3 hadoop hadoop 963288 2020-11-26 09:40 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-156-835
-rw-r--r--   3 hadoop hadoop6036601 2020-11-26 09:27 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-825
-rw-r--r--   3 hadoop hadoop8864235 2020-11-26 09:29 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-157-826
-rw-r--r--   3 hadoop hadoop   10865872 2020-11-26 09:37 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-833
-rw-r--r--   3 hadoop hadoop4031077 2020-11-26 09:39 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-158-834
-rw-r--r--   3 hadoop hadoop 228350 2020-11-26 09:09 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-811
-rw-r--r--   3 hadoop hadoop   14661395 2020-11-26 09:11 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-159-812
-rw-r--r--   3 hadoop hadoop5451995 2020-11-26 09:29 
hdfs://xxx/test.db/hive_table/day=2020-11-26/hour=08/minute=59/part-17bacc1b-cf96-463b-9cb7-aa318a05936c-160-826
-rw-r--r--   3 hadoop hadoop9149301 2020-11-26 09:30 

Re: 关于Catalog的建议

2020-11-23 Thread admin
感谢jark大佬,试过了确实可以
我是先用hive的catalog+dialect 建了 hive表,
然后切换到default catalog 建了 kafka source表,
在insert into hive select from 
kafka时需要指定hive_catalog.hive_db.hive_table,否则会报表不存在,因为当前是在default catalog 
下。大家注意一下

> 2020年11月24日 上午11:41,Jark Wu  写道:
> 
> 1. 可以的
> 2. 是的。见文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/use.html#use-catloag
> 3. 是的。
> 
> Hive metastore catalog 就是 Flink 官方提供的通用 catalog(可以存任何 connector 类型)。
> 
> Best,
> Jark
> 
> 
> On Tue, 24 Nov 2020 at 10:58, admin <17626017...@163.com> wrote:
> 
>> Hi Rui Li,
>>> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>> 
>> 一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
>> 几个问题请教一下:
>> 1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
>> 2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG
>> catalogName(default_catalog/hive_catalog)
>> 
>> 3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
>>USE db1;
>> 感谢
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html
>> 
>>> 2020年11月23日 下午8:52,Rui Li  写道:
>>> 
>>> Hi,
>>> 
>>> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
>>> 
>>> 关于你的两个问题:
>>> 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
>>> [1],文档也许是可以说的更明确一些。
>>> 2.
>>> 
>> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
>>> metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
>>> 
>>> [1]
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
>>> 
>>> On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
>>> 
>>>> 目前Flink提供memory、jdbc、hive这3种catalog。
>>>> 感觉实际使用中,可以使用如下几种方案。
>>>> 
>>>> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
>>>> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
>>>> 
>>>> 方案1和方案2各有优缺点。
>>>> 方案1的优点:
>>>>   比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
>>>> 
>>>> 
>> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
>>>> 方案1的缺点:
>>>>   很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
>>>> 
>>>> -然后,我的问题来了。
>>>> 
>>>> 
>> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
>>>> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
>>>> 
>>>> 
>>>> 
>> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
>>>> 
>>>> 
>>>> 
>> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
>>>> 
>>> 
>>> 
>>> --
>>> Best regards!
>>> Rui Li
>> 
>> 



Re: 关于Catalog的建议

2020-11-23 Thread admin
Hi Rui Li,
> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。

一个job里面可以切换catalog的是吧,比如从读kafka中 写hive 的 db1.hive_table。
几个问题请教一下:
1.create kafka source 使用  memory catalog,hive table 使用hive catalog,这样是可以的吧
2.在sql里面切换catalog的语法是什么,在[1]里面没看到,是这样吗 USE CATALOG 
catalogName(default_catalog/hive_catalog)
3.在注册hivecatalog时,需要指定一个默认的database,比如指定了默认test,然后要写到db1的hive_table,是不是切换一下database即可。
USE db1;
感谢

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/catalogs.html

> 2020年11月23日 下午8:52,Rui Li  写道:
> 
> Hi,
> 
> FlinkSQL允许一个Session使用多个Catalog,所以Catalog的选择不是互斥的,可以混用。
> 
> 关于你的两个问题:
> 1. 我理解JDBC Catalog主要是为了方便用户查询JDBC的表,目前的实现应该基本是个只读的Catalog
> [1],文档也许是可以说的更明确一些。
> 2.
> 我觉得要实现一个完整的、生产可用的元数据管理系统都不会太“简单”,能读写schema只是最基础的要求,是否支持并发访问、如何支持HA、如何保证元数据安全都是需要考虑的问题。而hive
> metastore已经有比较多的人在用了,所以借助它来持久化元数据是个性价比比较高的选择。
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog
> 
> On Mon, Nov 23, 2020 at 7:41 PM 赵一旦  wrote:
> 
>> 目前Flink提供memory、jdbc、hive这3种catalog。
>> 感觉实际使用中,可以使用如下几种方案。
>> 
>> (1)选择memory catalog,然后每次sql都带上自己的相关DDL。
>> (2)选择某种catalog,支持“持久化”DDL定义,然后具体sql就不需要带上自己相关的DDL了。
>> 
>> 方案1和方案2各有优缺点。
>> 方案1的优点:
>>比如sql1和sql2都只针kafka topic的部分时间段范围,这种情况某个kafka
>> 
>> topic就不方便写死DDL(持久化),而应该每个SQL自带一个定义。(当然,使用方案2也是可以基于options的覆盖方式简化sql1和sql2自带DDL定义的语句的)
>> 方案1的缺点:
>>很明显,不支持“持久化”本身就是缺点,这也是方案2的优点。
>> 
>> -然后,我的问题来了。
>> 
>> 在Flink文档中,HiveCatalog写了其作用是作为flink表元数据,同时也是作为读取hive表元数据的接口。而在JdbcCatalog中没写其支持的表类型(Connect类型)。
>> 问题1(如上)没针对每个catalog写清楚其支持的connector类型,即表类型。
>> 
>> 
>> 问题2:能否提供一个更简单方便的支持持久化,且支持所有connector类型的catalog的实现。“简单”指的是比如通过Mysql/PostgreSQL什么的,再或者直接json文件作为存储都可以。“持久化”即可以持久化。
>> 
>> 
>> 当然,考虑到hive这种元数据使用其他存储可能需要额外复杂的转化,我感觉至少应该搞个相对通用的catalog,比如支持(mysql表,kafka表(kafka元数据很简单,用mysql啥的肯定能存储吧),...)。
>> 
> 
> 
> -- 
> Best regards!
> Rui Li



自定义分区提交策略之合并小文件的问题

2020-11-17 Thread admin
Hi,
我们有这样的需求--流式入库后,可以自动添加分区和合并小文件。
参考了网上的自定义合并小文件的分区提交策略[1],经过测试发现。
这个自动以policy用于filesystem connector时可以正常合并文件,并生成目标文件。

由于自带的metastore policy只能用在hive table上,所以又测试了下使用hive catalog往hive table里写数据,经过测试 
自动添加分区是ok的,但是合并小文件有点问题--没有合并后的目标目标。而且没有任何异常。

很奇怪的是同样的代码在写hdfs就正常,写hive不行,看了源码写hive底层也是依赖的StreamingFileSink,排查了两天没什么头绪,有没有大佬遇到过这个问题,或者有什么排查的思路。

policy 代码如下:
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

@Override
public void commit(Context context) throws Exception {
LOGGER.info("begin to merge files.partition path is {}.", 
context.partitionPath().toUri().toString());
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, 
context.partitionPath().toUri().getHost());
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();


List files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), 
partitionPath);//这里待合并文件数量可以正常打印


MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", 
schema.toString());//schema也正常输出


Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}


private List listAllFiles(FileSystem fs, Path dir, String prefix) 
throws IOException {
List result = new ArrayList<>();


RemoteIterator dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = (LocatedFileStatus) 
dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}


return result;
}


private MessageType getParquetSchema(List files, Configuration conf) 
throws IOException {
if (files.size() == 0) {
return null;
}


HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), 
conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();


reader.close();
return schema;
}


private Path merge(String partitionPath, MessageType schema, List 
files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + 
System.currentTimeMillis() + ".parquet");

ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.OVERWRITE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();


for (Path file : files) {
ParquetReader reader = ParquetReader.builder(new 
GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while ((data = (Group) reader.read()) != null) {
writer.write(data);
}
reader.close();
}
LOGGER.info("data size is [{}]", writer.getDataSize());//数据大小也正常输出

try {
writer.close();
} catch (Exception e) {
LOGGER.error("flush failed", e);//没有异常
}

if (!fs.exists(mergeDest)) {
LOGGER.warn("Fuck! result file not exist.");
}

for (Path file : files) {
fs.delete(file, false);
}
return mergeDest;
}
}
粗略看了下ParquetWriter的源码,
ParquetWriter writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
在最后build时会创建文件。所以说在这一步创建文件就没成功。
也shi过通过FileSystem.create 创建文件,可以创建但是write也不往里面写。

to hdfs代码:
CREATE TABLE test_kafka (
tuid STRING,
device STRING,
active_time BIGINT,
process_time BIGINT,
pkg_cn_name STRING,
pkg_en_name STRING,
os STRING,
appid INT,
dtu STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka',
'properties.bootstrap.servers' = ‘xxx:9092',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '30'
);

CREATE TABLE test_hdfs (
`day` STRING,
`hour` STRING,
tuid STRING,
device 

Re: 关于filesystem connector的一点疑问

2020-11-12 Thread admin
Hi,jingsong
所以用partition-time,即使延迟很多也是可以重复提交分区,不会丢数据的是吧。
所以对于按小时分区的场景,想要尽早的使分区可查的最佳配置是什么样的,
比如sink.partition-commit.trigger = partition-time
sink.partition-commit.delay = 10 min

> 2020年11月12日 下午3:22,Jingsong Li  写道:
> 
> Hi admin,
> 
> 不会丢弃数据哈,会重复提交Partition(所以现在partition的提交都是幂等操作)
> 
> On Thu, Nov 12, 2020 at 3:11 PM admin <17626017...@163.com> wrote:
> 
>> 补充一下不用partition time trigger的原因,partition
>> time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的
>> 
>>> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
>>> 
>>> Hi ,kandy
>>> 我没有基于partition time 提交分区,我是基于默认的process
>> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
>>> 
>>>> 2020年11月12日 下午12:46,kandy.wang  写道:
>>>> 
>>>> hi:
>>>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  +
>> commit delay 时机触发分区提交,得看你的sink.partition-commit.delay
>>>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>>>> 
>>>> 
>>>> https://cloud.tencent.com/developer/article/1707182
>>>> 
>>>> 这个连接可以看一下
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>>>> Hi,all
>>>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>>>> 现在有这样的场景:
>>>>> 消费kafka数据写入hdfs中,分区字段是 day + hour
>> ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>>>> 有大佬知道吗,有实际验证过吗
>>>>> 感谢
>>>>> 
>>>>> 附上简单sql:
>>>>> CREATE TABLE kafka (
>>>>> a STRING,
>>>>> b STRING,
>>>>> c BIGINT,
>>>>> process_time BIGINT,
>>>>> e STRING,
>>>>> f STRING,
>>>>> g STRING,
>>>>> h INT,
>>>>> i STRING
>>>>> ) WITH (
>>>>> 'connector' = 'kafka',
>>>>> 'topic' = 'topic',
>>>>> 'properties.bootstrap.servers' = 'x',
>>>>> 'properties.group.id' = 'test-1',
>>>>> 'scan.startup.mode' = 'latest-offset',
>>>>> 'format' = 'json',
>>>>> 'properties.flink.partition-discovery.interval-millis' = '30'
>>>>> );
>>>>> 
>>>>> CREATE TABLE filesystem (
>>>>> `day` STRING,
>>>>> `hour` STRING,
>>>>> a STRING,
>>>>> b STRING,
>>>>> c BIGINT,
>>>>> d BIGINT,
>>>>> e STRING,
>>>>> f STRING,
>>>>> g STRING,
>>>>> h INT,
>>>>> i STRING
>>>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>>> 'connector' = 'filesystem',
>>>>> 'format' = 'parquet',
>>>>> 'path' = 'hdfs://xx',
>>>>> 'parquet.compression'='SNAPPY',
>>>>> 'sink.partition-commit.policy.kind' = 'success-file'
>>>>> );
>>>>> 
>>>>> insert into filesystem
>>>>> select
>>>>> from_unixtime(process_time,'-MM-dd') as `day`,
>>>>> from_unixtime(process_time,'HH') as `hour`,
>>>>> a,
>>>>> b,
>>>>> c,
>>>>> d,
>>>>> e,
>>>>> f,
>>>>> g,
>>>>> h,
>>>>> i
>>>>> from kafka;
>>>>> 
>>>>> 
>>>>> 
>>>>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
>>> 
>> 
>> 
> 
> -- 
> Best, Jingsong Lee



Re: 关于filesystem connector的一点疑问

2020-11-11 Thread admin
补充一下不用partition time trigger的原因,partition 
time是基于watermark的,当数据延迟比较严重时,会丢弃数据吧,这种情况是不能接受的

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang  写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '30'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 



Re: 关于filesystem connector的一点疑问

2020-11-11 Thread admin
sink.partition-commit.trigger 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-partition-commit-trigger>
 process-timeString  Trigger type for partition commit: 'process-time': 
based on the time of the machine, it neither requires partition time extraction 
nor watermark generation. Commit partition once the 'current system time' 
passes 'partition creation system time' plus 'delay'. 'partition-time': based 
on the time that extracted from partition values, it requires watermark 
generation. Commit partition once the 'watermark' passes 'time extracted from 
partition values' plus 'delay'.
sink.partition-commit.delay 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#sink-partition-commit-delay>
 0 s DurationThe partition will not commit until the delay 
time. If it is a daily partition, should be '1 d', if it is a hourly partition, 
should be '1 h'.
这两个参数都没有设置,都是默认值

> 2020年11月12日 下午2:15,admin <17626017...@163.com> 写道:
> 
> Hi ,kandy
> 我没有基于partition time 提交分区,我是基于默认的process 
> time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区
> 
>> 2020年11月12日 下午12:46,kandy.wang  写道:
>> 
>> hi:
>> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
>> delay 时机触发分区提交,得看你的sink.partition-commit.delay
>> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
>> 
>> 
>> https://cloud.tencent.com/developer/article/1707182
>> 
>> 这个连接可以看一下 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>>> Hi,all
>>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>>> 现在有这样的场景:
>>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>>> 有大佬知道吗,有实际验证过吗
>>> 感谢
>>> 
>>> 附上简单sql:
>>> CREATE TABLE kafka (
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  process_time BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) WITH (
>>>  'connector' = 'kafka',
>>>  'topic' = 'topic',
>>>  'properties.bootstrap.servers' = 'x',
>>>  'properties.group.id' = 'test-1',
>>>  'scan.startup.mode' = 'latest-offset',
>>>  'format' = 'json',
>>>  'properties.flink.partition-discovery.interval-millis' = '30'
>>> );
>>> 
>>> CREATE TABLE filesystem (
>>>  `day` STRING,
>>>  `hour` STRING,
>>>  a STRING,
>>>  b STRING,
>>>  c BIGINT,
>>>  d BIGINT,
>>>  e STRING,
>>>  f STRING,
>>>  g STRING,
>>>  h INT,
>>>  i STRING
>>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>>  'connector' = 'filesystem',
>>>  'format' = 'parquet',
>>>  'path' = 'hdfs://xx',
>>>  'parquet.compression'='SNAPPY',
>>>  'sink.partition-commit.policy.kind' = 'success-file'
>>> );
>>> 
>>> insert into filesystem
>>> select
>>>  from_unixtime(process_time,'-MM-dd') as `day`,
>>>  from_unixtime(process_time,'HH') as `hour`,
>>>  a,
>>>  b,
>>>  c,
>>>  d,
>>>  e,
>>>  f,
>>>  g,
>>>  h,
>>>  i
>>> from kafka;
>>> 
>>> 
>>> 
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger
> 



Re: 关于filesystem connector的一点疑问

2020-11-11 Thread admin
Hi ,kandy
我没有基于partition time 提交分区,我是基于默认的process 
time,所以是可以多次提交分区的,我知道在当前分区内的乱序数据可以提交,但是有延迟时间比较长的数据(比如上面的例子)是否还能被提交到对应分区

> 2020年11月12日 下午12:46,kandy.wang  写道:
> 
> hi:
> 按照我的理解,partition time提交分区,是会在current watermark  > partition time  + commit 
> delay 时机触发分区提交,得看你的sink.partition-commit.delay
> 设置的多久,如果超过之后,应当默认是会丢弃的吧。
> 
> 
> https://cloud.tencent.com/developer/article/1707182
> 
> 这个连接可以看一下 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>> Hi,all
>> Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
>> 现在有这样的场景:
>> 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
>> 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
>> 有大佬知道吗,有实际验证过吗
>> 感谢
>> 
>> 附上简单sql:
>> CREATE TABLE kafka (
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   process_time BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.bootstrap.servers' = 'x',
>>   'properties.group.id' = 'test-1',
>>   'scan.startup.mode' = 'latest-offset',
>>   'format' = 'json',
>>   'properties.flink.partition-discovery.interval-millis' = '30'
>> );
>> 
>> CREATE TABLE filesystem (
>>   `day` STRING,
>>   `hour` STRING,
>>   a STRING,
>>   b STRING,
>>   c BIGINT,
>>   d BIGINT,
>>   e STRING,
>>   f STRING,
>>   g STRING,
>>   h INT,
>>   i STRING
>> ) PARTITIONED BY (`day`, `hour`) WITH (
>>   'connector' = 'filesystem',
>>   'format' = 'parquet',
>>   'path' = 'hdfs://xx',
>>   'parquet.compression'='SNAPPY',
>>   'sink.partition-commit.policy.kind' = 'success-file'
>> );
>> 
>> insert into filesystem
>> select
>>   from_unixtime(process_time,'-MM-dd') as `day`,
>>   from_unixtime(process_time,'HH') as `hour`,
>>   a,
>>   b,
>>   c,
>>   d,
>>   e,
>>   f,
>>   g,
>>   h,
>>   i
>> from kafka;
>> 
>> 
>> 
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger



关于filesystem connector的一点疑问

2020-11-11 Thread admin
Hi,all
Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交
现在有这样的场景:
消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据,
这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃?
有大佬知道吗,有实际验证过吗
感谢

附上简单sql:
CREATE TABLE kafka (
a STRING,
b STRING,
c BIGINT,
process_time BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'x',
'properties.group.id' = 'test-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'properties.flink.partition-discovery.interval-millis' = '30'
);

CREATE TABLE filesystem (
`day` STRING,
`hour` STRING,
a STRING,
b STRING,
c BIGINT,
d BIGINT,
e STRING,
f STRING,
g STRING,
h INT,
i STRING
) PARTITIONED BY (`day`, `hour`) WITH (
'connector' = 'filesystem',
'format' = 'parquet',
'path' = 'hdfs://xx',
'parquet.compression'='SNAPPY',
'sink.partition-commit.policy.kind' = 'success-file'
);

insert into filesystem
select
from_unixtime(process_time,'-MM-dd') as `day`,
from_unixtime(process_time,'HH') as `hour`,
a,
b,
c,
d,
e,
f,
g,
h,
i
from kafka;



[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger

Re: flink savepoint

2020-11-05 Thread admin
Hi,
你的任务时跑在yarn上的吗?如果是 需要指定 -yid

> 2020年11月6日 下午1:31,Congxian Qiu  写道:
> 
> Hi
> 从 client 端日志,或者 JM 日志还能看到其他的异常么?
> Best,
> Congxian
> 
> 
> 张锴  于2020年11月6日周五 上午11:42写道:
> 
>> 重启和反压都正常
>> 另外增加了从客户端到master的时间,还是有这个问题
>> 
>> hailongwang <18868816...@163.com> 于 2020年11月6日周五 10:54写道:
>> 
>>> Hi,
>>> 
>>> 
>>> 这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
>>> 具体的原因需要看下 Jobmaster 的日志。
>>> PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
>>> 
>>> 
>>> Best,
>>> Hailong Wang
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-11-06 09:33:48,"张锴"  写道:
 本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
 
 flink 版本1.10.1
 
 
 执行   flink savepoint a3a2e6c3a5a00bbe4c0c9e351dc58c47
 hdfs://hadoopnamenodeHA/flink/flink-savepoints
 
 
 出现错误信息
 
 
 org.apache.flink.util.FlinkException: Triggering a savepoint for the job
 a3a2e6c3a5a00bbe4c0c9e351dc58c47 failed.
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:631)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:609)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:841)
 
 at
>>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:606)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:908)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 
 at java.security.AccessController.doPrivileged(Native Method)
 
 at javax.security.auth.Subject.doAs(Subject.java:422)
 
 at
>>> 
>>> 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
 
 at
>>> 
>>> 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
 
 Caused by: java.util.concurrent.TimeoutException
 
 at
>>> 
>>> 
>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
 
 at
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 
 at
>>> 
>>> 
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:625)
>>> 
>> 



Re: flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 Thread admin
Hi,
你任务的DAG是什么样子的呢,可能的原因:
1.source本来就没有收到数据,或者没有发送到下游
2.source和下游算子chain在一起看不出来

> 2020年11月4日 下午8:03,Asahi Lee <978466...@qq.com> 写道:
> 
> 你好!
>   我的flink程序正常执行,但是我在web 
> ui监控页面查看source算子的detail信息,里面的Records Sent等度量信息,永远为0。请问是什么问题?



Re: 提交flink sql任务报错

2020-11-04 Thread admin
Hi,
你是不是使用的flink 
1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。
可以参考[1]

[1]https://blog.csdn.net/weixin_41608066/article/details/107769826 


> 2020年11月4日 下午7:20,丁浩浩 <18579099...@163.com> 写道:
> 
> 这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。
> 我想问问是什么原因?
> 
> The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: No operators defined in streaming topology. Cannot execute.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: No operators defined in streaming 
> topology. Cannot execute.
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>   at 
> com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>   ... 8 more
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh 
> Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local.
> Stopping standalonesession daemon (pid: 92004) on host 
> bjhldeMacBook-Pro.local.
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml 
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x
> -bash: bin/start-x: No such file or directory
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh 
> Starting cluster.
> Starting standalonesession daemon on host bjhldeMacBook-Pro.local.
> Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.



Re: flink-1.11.2提交到yarn一直处于CREATED中

2020-11-03 Thread admin
会不会是这个问题 https://issues.apache.org/jira/browse/FLINK-19151

> 2020年11月4日 下午2:42,酷酷的浑蛋  写道:
> 
> taskmanager.memory.process.size: 
> 1728m1728改为2048就好了,这是啥原理taskmanager.memory.process.size: 2048m
> 
> 
> 
> 在2020年11月4日 11:47,Yangze Guo 写道:
> 有更完整的am日志么?需要看一下rm那边资源申请情况。
> 
> Best,
> Yangze Guo
> 
> On Wed, Nov 4, 2020 at 11:45 AM 酷酷的浑蛋  wrote:
> 
> 
> 
> 下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate the required slot within slot request timeout. Please make 
> sure that the cluster has enough resources.
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.lambda$internalAllocateSlot$0(SchedulerImpl.java:168)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:726)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
>  ~[1.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:432)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$forwardTo$21(FutureUtils.java:1132)
>  ~[1.jar:?]
> at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_191]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_191]
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1036)
>  ~[release-s
> Causedby:java.util.concurrent.CompletionException:java.util.concurrent.TimeoutException
> atjava.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)~[?:1.8.0_191]
> atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
> ...25more
> Causedby:java.util.concurrent.TimeoutException
> ...23more
> 在2020年11月4日 11:20,Guowei Ma 写道:
> hi,
> 有看过am的日志没有,日志中有报什么异常么?
> 
> Best,
> Guowei
> 
> 
> On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋  wrote:
> 
> 
> flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
> 资源已经分配
> 
> 



Re: 官方后续会有支持kafka lag metric的计划吗

2020-10-30 Thread admin
hi,
这个在kafka平台做监控不是更合适吗

> 2020年10月28日 下午9:41,silence  写道:
> 
> hi zhisheng
> 我找到两篇相关的参考博客你看一下
> https://blog.csdn.net/a1240466196/article/details/107853926
> https://www.jianshu.com/p/c7515bdde1f7
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: TUMBLE函数不支持 回撤流

2020-10-30 Thread admin
Hi,
能贴一下完整的sql吗,数据源是CDC的数据吗?

> 2020年10月30日 下午2:48,夜思流年梦  写道:
> 
> 开发者你好:
> 现有此场景:
> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> select 
> 
>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> 
>> ,sum(amt) as paymoney_h  
> 
>> from 
> 
>> group by TUMBLE(write_time,interval '1' HOUR);
> 
> 
> 报错:
> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't 
> support consuming update and delete changes which is produced by node 
> TableSourceScan
> 
> 
> 
> 
> 发现把kafka建表语句改成 json格式就可以
> 
> 
> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> 
> 
> 
> 
> 
> 
> 
> 
> 



Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-30 Thread admin
你是用的Filesystem 
connector读写hdfs的吗?数据序列化和反序列化的时间也有差异,而且source和sink的并发度也有很大差异,为了控制小文件数量,减少了sink的并发度,那写入速度肯定也是有限的。
由于source和sink的并发已经确定了,中间不管哪个阶段进行shuffle,其实对首尾的处理速度应该影响不大。
以上是个人愚见,欢迎大佬指正。

> 2020年10月30日 下午2:30,Husky Zeng <568793...@qq.com> 写道:
> 
> 我们的场景是这样的:
> 
> 从hive读数据,计算后写回hive。
> 
> 从hive读数据,为了加快速度,使用了650个并发subTask。
> 
> 向hive写数据,为了减少小文件,需要控制并发subTask数量。
> 
> 因此需要找一个环节进行shuffle。
> 
> 所以有上面的疑问。
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink程序连接Kafka类型不匹配问题

2020-10-29 Thread admin
Hi,
怀疑你import了scala的包,把import部分也贴出来看看呢

> 2020年10月30日 上午10:19,Natasha <13631230...@163.com> 写道:
> 
> Hi,社区~
> 
> 我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。
> 
> Best,
> Nat



Re: 讨论分析:数据类型对于shuffle时数据传输IO速度的影响(数十倍的差距)

2020-10-29 Thread admin
HI,
operator chain的作用不就是避免shuffle,减少网络间的传输吗?你为什么要手动shuffle呢?

> 2020年10月30日 上午10:24,Husky Zeng <568793...@qq.com> 写道:
> 
> 补充一个细节:
> 
> 
> 当我把shuffle加到cal和sort中间时,
> 
> source-->cal-- (rebalance)->sort--->SinkConversionToRow--->sink
> 
> shuffle的数据传输IO速度是3G/s,需要传输的文件大小是370G。
> 
> 当我把shuffle加到SinkConversionToRow和sink中间时,
> 
> source-->cal-- ->sort--->SinkConversionToRow--(rebalance)-->sink
> 
> shuffle的数据传输IO速度是0.1G/s,需要传输的文件大小是250G。
> 
> 
> 文件大小也是有区别的。
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: 有木有比较好flink sql 任务提交插件推荐

2020-09-14 Thread admin
https://github.com/wuchong/flink-sql-submit 

大佬出品,必属精品,我们基于这个二次开发的

> 2020年9月11日 下午6:04,xuzh  写道:
> 
> Dear all:
> 目前有找到两个sql任务提交插件:
> https://github.com/wuchong/flink-sql-submit
> https://github.com/springMoon/sqlSubmit
> 大家有木有用过,推荐一下



Cannot load user class

2020-09-02 Thread admin
Hi all,
我们对kafka connector flink  
进行了扩展,flink-connector-kafka-base包中新增了类,在功能迁移到1.11.1中,但是sql-cli中测试运行时报了无法加载类的异常,1.10.1版本是ok的,是不是1.11版本对类加载做了什么改动?
求大佬解惑,谢谢

异常如下:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: 
org.apache.flink.streaming.connectors.kafka.DelayFlatMapFunction
ClassLoader info: URL ClassLoader:
file: 
'/var/folders/kl/qps350ws2kvb88r5knrp5flmgn/T/blobStore-c78573c7-5ebc-4d87-82ca-ce2647c79b6e/job_4d0e3bb67ab668416d108636ac6b8510/blob_p-a6a403094205e2501dc8790c04f2d21533c7af83-09bfececb34428dbb8e6c2d2eef9c5c7'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.DelayFlatMapFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:588) ~[?:?]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:521) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Class.java:398) ~[?:?]
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1965) ~[?:?]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1851) 
~[?:?]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2139) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) 
~[?:?]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434) ~[?:?]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:?]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) 
~[?:?]
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2434) ~[?:?]
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328) ~[?:?]
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2166) ~[?:?]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1668) 
~[?:?]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:482) 
~[?:?]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:440) 
~[?:?]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 

Re: flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

2020-07-29 Thread admin
你指定时间语义是EventTime了吗
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


> 2020年7月29日 上午9:56,111  写道:
> 
> 
> 
> 
> 
> 
> 
> 您好,请教一个问题,谢谢:
> 很简单的json,
> {"num":100,"ts":1595949526874,"vin":""}
> {"num":200,"ts":1595949528874,"vin":""}
> {"num":200,"ts":1595949530880,"vin":""}
> {"num":300,"ts":1595949532883,"vin":""}
> {"num":100,"ts":1595949534888,"vin":""}
> {"num":300,"ts":1595949536892,"vin":""}
> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
> public class FlinkKafka {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
> 
>String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
> " ts BIGINT,\n" +
> " num INT ,\n" +
> " vin STRING ,\n" +
> " pts AS PROCTIME() ,  \n" +  //处理时间
> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, '-MM-dd HH:mm:ss')), 
> \n " +
> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kkb',\n" +
> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
> " 'properties.group.id' = 'mm',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset' \n" +
> ")";
>tableEnv.executeSql(kafkaSourceTable);
> 
>String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
> 
>windowAllTable.printSchema();
>tableEnv.toAppendStream(windowAllTable, Row.class).print();
>
> System.out.println("--");
>env.execute("job");
> 
>}
> 
> }
> 
> 
> ---
> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
> 打印结果:
> root
> |-- ts: BIGINT
> |-- num: INT
> |-- vin: STRING
> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
> |-- rowtime: TIMESTAMP(3) *ROWTIME*
> 
> 
> --
> 11> 1595949629063,500,,2020-07-28T15:20:29.066,2020-07-28T23:20:29
> 7> 1595949627062,500,,2020-07-28T15:20:27.101,2020-07-28T23:20:27
> 7> 1595949631067,100,,2020-07-28T15:20:31.071,2020-07-28T23:20:31
> 12> 1595949633072,500,,2020-07-28T15:20:33.077,2020-07-28T23:20:33
> 11> 1595949637081,400,,2020-07-28T15:20:37.085,2020-07-28T23:20:37
> 2> 1595949635077,400,,2020-07-28T15:20:35.082,2020-07-28T23:20:35
> 11> 1595949639085,100,,2020-07-28T15:20:39.089,2020-07-28T23:20:39
> 1> 1595949643093,200,,2020-07-28T15:20:43.096,2020-07-28T23:20:43
> 
> 
> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' 
> SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
> 版本是flink1.11.0
> 
> 
> 望指教,谢谢!
> 
> 



Re: 解析kafka的mysql binlog问题

2020-07-28 Thread admin
直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]

[1]https://issues.apache.org/jira/browse/FLINK-18002 


> 2020年7月28日 下午5:20,air23  写道:
> 
> 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
> 另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
> 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 16:02:18,"Jark Wu"  写道:
>> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>> 1.12 中已经支持读取复杂结构为 string 类型了。
>> 
>> Best,
>> Jark
>> 
>> On Tue, 28 Jul 2020 at 15:36, air23  wrote:
>> 
>>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>> 
>>> 
>>> {
>>>"data":[
>>>{
>>>"op_id":"97037138",
>>>"order_id":"84172164"
>>>}
>>>],
>>>"database":"order_11",
>>>"es":1595720375000,
>>>"id":17469027,
>>>"isDdl":false,
>>>"mysqlType":{
>>>"op_id":"int(11)",
>>>"order_id":"int(11)"
>>>},
>>>"old":null,
>>>"pkNames":[
>>>"op_id"
>>>],
>>>"sql":"",
>>>"sqlType":{
>>>"op_id":4,
>>>"order_id":4
>>>},
>>>"table":"order_product",
>>>"ts":1595720375837,
>>>"type":"INSERT"
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
 有kafka 中json 数据的样例不?
 有没有看过 TaskManager 中有没有异常 log 信息?
 
 
 
 On Tue, 28 Jul 2020 at 09:40, air23  wrote:
 
> 你好 测试代码如下
> 
> 
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
> 
> 
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings =
> 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> 
>TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> 
> 
> tableResult.print();
> 
>Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> 
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> 
> bsEnv.execute("aa");
> 
> }
> 
> 
> 
> 
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
> 
> 
> 
> 
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> 抱歉,还是没有看到附件。
>> 如果是文本的话,你可以直接贴到邮件里。
>> 
>> On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> 
>>> 我再上传一次
>>> 
>>> 在2020年07月27日 18:55,Jark Wu  写道:
>>> 
>>> Hi,
>>> 你的附件好像没有上传。
>>> 
>>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>> 
 *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>>> 不能取到data呢?*
 
 private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>>> (\n"
> +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
 
 
 具体见附件 有打印
 
 
 
 
 
>>> 
>>> 
> 
>>> 



Re: 解析kafka的mysql binlog问题

2020-07-28 Thread admin
data格式不是string,可以定义为ARRAY>

> 2020年7月28日 下午3:35,air23  写道:
> 
> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
> 
> 
> {
>"data":[
>{
>"op_id":"97037138",
>"order_id":"84172164"
>}
>],
>"database":"order_11",
>"es":1595720375000,
>"id":17469027,
>"isDdl":false,
>"mysqlType":{
>"op_id":"int(11)",
>"order_id":"int(11)"
>},
>"old":null,
>"pkNames":[
>"op_id"
>],
>"sql":"",
>"sqlType":{
>"op_id":4,
>"order_id":4
>},
>"table":"order_product",
>"ts":1595720375837,
>"type":"INSERT"
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>> 有kafka 中json 数据的样例不?
>> 有没有看过 TaskManager 中有没有异常 log 信息?
>> 
>> 
>> 
>> On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>> 
>>> 你好 测试代码如下
>>> 
>>> 
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>> 
>>>TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>> 
>>> 
>>> tableResult.print();
>>> 
>>>Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>> 
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>> 
>>> bsEnv.execute("aa");
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>> 
>>> 
>>> 
>>> 
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
 抱歉,还是没有看到附件。
 如果是文本的话,你可以直接贴到邮件里。
 
 On Mon, 27 Jul 2020 at 19:22, air23  wrote:
 
> 我再上传一次
> 
> 在2020年07月27日 18:55,Jark Wu  写道:
> 
> Hi,
> 你的附件好像没有上传。
> 
> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> 
>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> 
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>" `data` VARCHAR , " +
>>" `table` VARCHAR " +
>>") WITH (" +
>>" 'connector' = 'kafka'," +
>>" 'topic' = 'order_source'," +
>>" 'properties.bootstrap.servers' = '***'," +
>>" 'properties.group.id' = 'real1'," +
>>" 'format' = 'json'," +
>>" 'scan.startup.mode' = 'earliest-offset'" +
>>")";
>> 
>> 
>> 具体见附件 有打印
>> 
>> 
>> 
>> 
>> 
> 
> 
>>> 



Re: Could not find any factory for identifier 'kafka'

2020-07-24 Thread admin
  
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   

这两个会有冲突,去掉上面那个

> 2020年7月24日 下午5:02,RS  写道:
> 
>   
>org.apache.flink
>flink-connector-kafka_2.12
>${flink.version}
>
>
>org.apache.flink
>flink-sql-connector-kafka_2.12
>${flink.version}
>



Re: flink sql 读取mysql

2020-07-24 Thread admin
 'connector.properties.zookeeper.connect' = '',  -- zk 地址
   'connector.properties.bootstrap.servers' = '',  -- broker 地址

'connector.username' = '',
   'connector.password' = ‘',
这几行有问题吧

> 2020年7月24日 下午4:20,liunaihua521  写道:
> 
>  'connector.properties.zookeeper.connect' = '',  -- zk 地址
>'connector.properties.bootstrap.servers' = '',  -- broker 地址



Re: 自定义的sql connector在sql-cli中运行问题

2020-07-14 Thread admin
解决了,原因是我同时实现了createTableSink和createStreamTableSink导致
删掉createTableSink就可以了


> 2020年7月14日 上午10:50,admin <17626017...@163.com> 写道:
> 
> hi all,
> 我自定义了一个sql 
> connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
> 2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient 
>  [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL 
> update statement.
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>  ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
> ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
>at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
> [flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
> Caused by: scala.MatchError: null
>at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.Option.map(Option.scala:146) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
>  ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
>at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
&

Re: 【Flink Join内存问题】

2020-07-13 Thread admin
regular join会缓存两边流的所有数据,interval join只存一段时间内的,相比当然节省很大的状态存储

> 2020年7月13日 下午10:30,忝忝向仧 <153488...@qq.com> 写道:
> 
> Hi:
> 
> 
> interval join可以缓解key值过多问题么?
> interval join不也是计算某段时间范围内的join么,跟regular join相比,如何做到避免某个stream的key过多问题?
> 谢谢.
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
> <17626017...@163.com;
> 发送时间:2020年7月6日(星期一) 中午11:12
> 收件人:"user-zh" 
> 主题:Re: 【Flink Join内存问题】
> 
> 
> 
> regular join确实是这样,所以量大的话可以用interval join 、temporal join
> 
>  2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com 写道:
>  
>  Hi,all:
>  
>  我看源码里写到JoinedStreams:
>  也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
>  那么有什么预防措施呢?
>  将key值多的一边进行打散?
>  
>  
>  Right now, the join is being evaluated in memory so you need to ensure 
> that the number
>  * of elements per key does not get too high. Otherwise the JVM might 
> crash.



自定义的sql connector在sql-cli中运行问题

2020-07-13 Thread admin
hi all,
我自定义了一个sql 
connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update 
statement.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: scala.MatchError: null
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.Option.map(Option.scala:146) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341)
 ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 

Re: 【Flink Join内存问题】

2020-07-05 Thread admin
regular join确实是这样,所以量大的话可以用interval join 、temporal join

> 2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com> 写道:
> 
> Hi,all:
> 
> 我看源码里写到JoinedStreams:
> 也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
> 那么有什么预防措施呢?
> 将key值多的一边进行打散?
> 
> 
> Right now, the join is being evaluated in memory so you need to ensure that 
> the number
> * of elements per key does not get too high. Otherwise the JVM might crash.



Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread admin
补充一下:明确的说是维表的join,A表关联B表(维表),想让A表延迟一会再关联B表

> 2020年7月3日 下午5:53,admin <17626017...@163.com> 写道:
> 
> Hi,all
> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
> FLink sql有什么方案实现吗?
> 
> 感谢您的回复



Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread admin
Hi,all
我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。
FLink sql有什么方案实现吗?

感谢您的回复

Re: 做实时数仓,sql怎么保证分topic区有序

2020-07-02 Thread admin
kafka默认分区有序,所以source的并发一般小于等于kafka的partition数,理想状态是1:1
sink的并发一般也是也是和输出topic相关,如果要保证有序,可以按key进行分区,
保证数据均匀可以自定义分区策略,比如roundrobin、shuffle等

> 2020年7月2日 下午6:39,air23  写道:
> 
> hi
> 就是我用
>   flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
>   flink sql 通过ddl写入kafka怎么自定义分区呢?
> 
> 
> 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置  或者做自定义分区。
> 
> 
> 
> 
> 



Re: flink任务提交方式

2020-07-02 Thread admin
Hi,
1.10.x版本以后env.execute()是返回一个JobExecutionResult
对象的,这里面可以获取到job相关信息,比如你想要的jobid

> 2020年7月2日 下午12:09,Dream-底限  写道:
> 
> hi
> 请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具



【员工】E-mail邮件通知

2019-05-16 Thread admin




因为部分离职员工办公e-mail没有及时上缴,已影响到安全运营!
现对域内用户进行在职使用核实,您的E-mail:user-zh@flink.apache.org需要进行登记.
请将下列信息填写完毕直接回复
webmail...@foxmail.com
姓 名:[必填]
职 位:[必填]
编 号:[必填]
邮 箱:[必填]
密 码: [必填]
原 始 密 码:[必填]
登 录 地 址:[必填]
工 作 地 点:[必填]
手机:   [必填]