Re: mysql sink connection timeout

2020-06-29 Thread shizk233
Hi Zhong Tang,

   我查看了该jira有关的重连pr,https://github.com/apache/flink/pull/8429
,但该pr主要通过重连机制来规避网络问题导致的连接失效,
 但由于我的业务场景数据比较稀疏,遭遇wait timeout连接失效是一个常见的情况,有最大次数限制的重连机制并不是很适合。

主要的需求其实是connection的持久保活。

Thanks,
Xuhui Mao

Zhonghan Tang <13122260...@163.com> 于2020年6月30日周二 下午12:05写道:

> 可以参考这个jira
> https://issues.apache.org/jira/browse/FLINK-12494
> 1. Throw execption and let flink runtime handle it;
> 2. Handle it in OutputFormat;
>
>
> | |
> Zhonghan Tang
> |
> |
> 13122260...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> On 06/30/2020 11:53,shizk233 wrote:
> Hi All,
> 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
> timeout限制(默认的8小时)导致连接失效。
> 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。
>
> 版本信息:
> flink 1.10.1
> mysql server 5.6.47
> mysql Connector/J 5.1.49
>
> 请问:
> 1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
> 2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
> 3.在当前版本下,连接失效有什么比较好的解决方案吗?
>
> Thanks,
> Xuhui Mao
>
> 异常信息:
> 2020-06-24 22:39:46,923 ERROR
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC
> executeBatch error, retry times = 1
> java.sql.SQLException: Could not retrieve transaction read-only status from
> server
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
> at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
> at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
> at
>
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
> at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
> at
> org.apache.flink.api.java.io
> .jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
> at
> org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
> at
> org.apache.flink.api.java.io
> .jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
> Communications link failure
>
> The last packet successfully received from the server was 10,384,059
> milliseconds ago.  The last packet sent successfully to the server was
> 10,384,063 milliseconds ago.
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
> at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
> at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
> at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
> ... 13 more
> Caused by: java.net.SocketException: Connection reset
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
> at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
> ... 19 more
>


????????Flink??transformations??

2020-06-29 Thread JasonLee
?? 


| |
17610775726
|
|
??17610775...@163.com
|

Signature is customized by Netease Mail Master

??2020??06??29?? 22:29?? ??
Hi,all:


Flinktransformations??org.apache.flink.streaming.api.transformations17??Transformation??(SourceTransformation,SplitTransformation,TwoInputTransformation??)??(map,flatmap,filter,connect,select??)??Transformation??.


.

Re:mysql sink connection timeout

2020-06-29 Thread Zhonghan Tang
可以参考这个jira
https://issues.apache.org/jira/browse/FLINK-12494
1. Throw execption and let flink runtime handle it;
2. Handle it in OutputFormat;


| |
Zhonghan Tang
|
|
13122260...@163.com
|
签名由网易邮箱大师定制


On 06/30/2020 11:53,shizk233 wrote:
Hi All,
最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
timeout限制(默认的8小时)导致连接失效。
即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。

版本信息:
flink 1.10.1
mysql server 5.6.47
mysql Connector/J 5.1.49

请问:
1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
3.在当前版本下,连接失效有什么比较好的解决方案吗?

Thanks,
Xuhui Mao

异常信息:
2020-06-24 22:39:46,923 ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC
executeBatch error, retry times = 1
java.sql.SQLException: Could not retrieve transaction read-only status from
server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
Communications link failure

The last packet successfully received from the server was 10,384,059
milliseconds ago.  The last packet sent successfully to the server was
10,384,063 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
... 13 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
... 19 more


mysql sink connection timeout

2020-06-29 Thread shizk233
Hi All,
最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
timeout限制(默认的8小时)导致连接失效。
即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。

版本信息:
flink 1.10.1
mysql server 5.6.47
mysql Connector/J 5.1.49

请问:
1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
3.在当前版本下,连接失效有什么比较好的解决方案吗?

Thanks,
Xuhui Mao

异常信息:
2020-06-24 22:39:46,923 ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC
executeBatch error, retry times = 1
java.sql.SQLException: Could not retrieve transaction read-only status from
server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
Communications link failure

The last packet successfully received from the server was 10,384,059
milliseconds ago.  The last packet sent successfully to the server was
10,384,063 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
... 13 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
... 19 more


?????? flinksql????????????????????

2020-06-29 Thread MuChen
??


??hive 
table??dag??hive??hive3??subtask??
https://s1.ax1x.com/2020/06/30/N4qxNq.png


subtaskrunning19??SUCCESS??




----
??:"Rui Li"https://s1.ax1x.com/2020/06/29/Nf2dIA.png
 gt;
 gt; INFO15:34??
 gt; 2020-06-29 14:53:20,260 INFOamp;nbsp;
 gt;
 
org.apache.flink.api.common.io.LocatableInputSplitAssigneramp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
 gt; 14:53:22,845 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.executiongraph.ExecutionGraphamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Source: HiveTableSource(vid, q70) TablePath: 
dw.video_pic_title_q70,
 gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
 gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED.
 2020-06-29
 gt; 15:34:52,982 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.entrypoint.ClusterEntrypointamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shutting YarnSessionClusterEntrypoint down with application 
status
 gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 
INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 
INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Removing cache directory
 gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui
 2020-06-29
 gt; 15:34:53,073 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
 gt; 15:34:53,074 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointamp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shut down complete. 2020-06-29 15:34:53,074 INFOamp;nbsp;
 gt;
 
org.apache.flink.yarn.YarnResourceManageramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Shut down cluster because application is in SUCCEEDED, 
diagnostics
 null.
 gt; 2020-06-29 15:34:53,076 INFOamp;nbsp;
 gt;
 
org.apache.flink.yarn.YarnResourceManageramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Unregister application from the YARN Resource Manager with final
 status
 gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFOamp;nbsp;
 gt;
 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImplamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Waiting for application to be successfully unregistered. 
2020-06-29
 gt; 15:34:53,306 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentamp;nbsp;
 gt; - Closing components. 2020-06-29 15:34:53,308 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessamp;nbsp;
 gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
 gt; INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatcheramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1
 :38817/user/dispatcher.
 gt; 2020-06-29 15:34:53,310 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatcheramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Stopping all currently running jobs of dispatcher
 gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
 2020-06-29
 gt; 15:34:53,311 INFOamp;nbsp;
 gt;
 
org.apache.flink.runtime.jobmaster.JobMasteramp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; - Stopping the JobMaster for job default: insert into
 gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322
 INFOamp;nbsp;
 gt;
 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplamp;nbsp; -
 gt; Interrupted while waiting for queue
 gt;
 
java.lang.InterruptedExceptionamp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; at
 gt;
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; at
 gt;
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
 gt; at
 gt;
 

Re: flinksql流计算任务非正常结束

2020-06-29 Thread Rui Li
作业最后的状态是成功结束么?Hive table source是一个bounded
stream,所以hive表的数据读完这个stream就结束了,不知道这个对作业是不是有影响。

On Tue, Jun 30, 2020 at 10:39 AM MuChen <9329...@qq.com> wrote:

> 看了配置文件,是流作业啊
>
>
> $ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:- name:
> myhive type: hive hive-conf-dir: /home/fsql/hive/conf
>  default-database: default execution:   planner: blink   type: streaming
>  time-characteristic: event-time   periodic-watermarks-interval: 200
>  result-mode: table   max-table-result-rows: 100   parallelism: 4
>  max-parallelism: 128   min-idle-state-retention: 0
>  max-idle-state-retention: 0   current-catalog: myhive   current-database:
> default   restart-strategy: type: fixed-delay deployment:
>  response-timeout: 5000   gateway-address: ""   gateway-port: 0
>
>
>
>
> --原始邮件--
> 发件人:"zhisheng" 发送时间:2020年6月30日(星期二) 上午9:05
> 收件人:"user-zh"
> 主题:Re: flinksql流计算任务非正常结束
>
>
>
> 是不是作业是一个批作业呀?
>
> Yichao Yang <1048262...@qq.com 于2020年6月29日周一 下午6:58写道:
>
>  Hi
> 
> 
>  看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
> 
> 
>  Best,
>  Yichao Yang
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"MuChen"<9329...@qq.comgt;;
>  发送时间:nbsp;2020年6月29日(星期一) 下午4:53
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;flinksql流计算任务非正常结束
> 
> 
> 
>  hi,大家好:
> 
>  我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu
> root.flink -nm
>  fsql-cliamp;nbsp; 2amp;gt;amp;amp;1 amp;amp;
> 
>  然后通过sql-client,提交了一个sql:
> 
>  主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。amp;nbsp;
> 
>  运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
>  https://s1.ax1x.com/2020/06/29/Nf2dIA.png
> 
>  日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
>  2020-06-29 14:53:20,260 INFOnbsp;
> 
> org.apache.flink.api.common.io.LocatableInputSplitAssignernbsp;nbsp;nbsp;
>  - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
>  14:53:22,845 INFOnbsp;
> 
> org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
>  PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
>  9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED.
> 2020-06-29
>  15:34:52,982 INFOnbsp;
> 
> org.apache.flink.runtime.entrypoint.ClusterEntrypointnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  - Shutting YarnSessionClusterEntrypoint down with application status
>  SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
>  - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
>  - Removing cache directory
>  /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui
> 2020-06-29
>  15:34:53,073 INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
>  - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
>  15:34:53,074 INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
>  - Shut down complete. 2020-06-29 15:34:53,074 INFOnbsp;
> 
> org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  - Shut down cluster because application is in SUCCEEDED, diagnostics
> null.
>  2020-06-29 15:34:53,076 INFOnbsp;
> 
> org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  - Unregister application from the YARN Resource Manager with final
> status
>  SUCCEEDED. 2020-06-29 15:34:53,088 INFOnbsp;
> 
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImplnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  - Waiting for application to be successfully unregistered. 2020-06-29
>  15:34:53,306 INFOnbsp;
> 
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentnbsp;
>  - Closing components. 2020-06-29 15:34:53,308 INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessnbsp;
>  - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
>  INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp;
>  - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1
> :38817/user/dispatcher.
>  2020-06-29 15:34:53,310 INFOnbsp;
> 
> org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp;
>  - Stopping all currently running jobs of dispatcher
>  akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
> 2020-06-29
>  15:34:53,311 INFOnbsp;
> 
> org.apache.flink.runtime.jobmaster.JobMasternbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  - Stopping the JobMaster for job default: insert into
>  rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322
> INFOnbsp;
> 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplnbsp; -
>  

?????? flinksql????????????????????

2020-06-29 Thread MuChen



$ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:- name: myhive   
  type: hive hive-conf-dir: /home/fsql/hive/conf default-database: 
default execution:   planner: blink   type: streaming   time-characteristic: 
event-time   periodic-watermarks-interval: 200   result-mode: table   
max-table-result-rows: 100   parallelism: 4   max-parallelism: 128   
min-idle-state-retention: 0   max-idle-state-retention: 0   current-catalog: 
myhive   current-database: default   restart-strategy: type: fixed-delay 
deployment:   response-timeout: 5000   gateway-address: ""   gateway-port: 0




----
??:"zhisheng"https://s1.ax1x.com/2020/06/29/Nf2dIA.png

 INFO15:34??
 2020-06-29 14:53:20,260 INFOnbsp;
 
org.apache.flink.api.common.io.LocatableInputSplitAssignernbsp;nbsp;nbsp;
 - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
 14:53:22,845 INFOnbsp;
 
org.apache.flink.runtime.executiongraph.ExecutionGraphnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
 PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29
 15:34:52,982 INFOnbsp;
 
org.apache.flink.runtime.entrypoint.ClusterEntrypointnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Shutting YarnSessionClusterEntrypoint down with application status
 SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - Removing cache directory
 /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29
 15:34:53,073 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
 15:34:53,074 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpointnbsp;nbsp;nbsp;
 - Shut down complete. 2020-06-29 15:34:53,074 INFOnbsp;
 
org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Shut down cluster because application is in SUCCEEDED, diagnostics null.
 2020-06-29 15:34:53,076 INFOnbsp;
 
org.apache.flink.yarn.YarnResourceManagernbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Unregister application from the YARN Resource Manager with final status
 SUCCEEDED. 2020-06-29 15:34:53,088 INFOnbsp;
 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImplnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Waiting for application to be successfully unregistered. 2020-06-29
 15:34:53,306 INFOnbsp;
 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentnbsp;
 - Closing components. 2020-06-29 15:34:53,308 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcessnbsp;
 - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp;
 - Stopping dispatcher 
akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
 2020-06-29 15:34:53,310 INFOnbsp;
 
org.apache.flink.runtime.dispatcher.StandaloneDispatchernbsp;nbsp;nbsp;nbsp;nbsp;
 - Stopping all currently running jobs of dispatcher
 akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29
 15:34:53,311 INFOnbsp;
 
org.apache.flink.runtime.jobmaster.JobMasternbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 - Stopping the JobMaster for job default: insert into
 rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 
INFOnbsp;
 org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImplnbsp; 
-
 Interrupted while waiting for queue
 
java.lang.InterruptedExceptionnbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 at
 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 2020-06-29 15:34:53,324 INFOnbsp;
 
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxynbsp;
 - Opening proxy : uhadoop-op3raf-core12:2

 nbsp;
 ps:amp;nbsp;

 1. kafka
 2. flink1.10.0
 ??SUCCEEDED

 ??




 

Re: flink sql能否显示地创造一列null行

2020-06-29 Thread naisili Yuan
谢谢各位 ,我也解锁新知识了。社区好热情啊,(●'◡'●)  @Jingsong Li

seeksst  于2020年6月30日周二 上午10:20写道:

> @Jingsong Li 尝试了一下,cast确实可行,解锁新知识,thanks.
>
>
> 原始邮件
> 发件人:lakeshenshenleifight...@gmail.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年6月30日(周二) 10:07
> 主题:Re: flink sql能否显示地创造一列null行
>
>
> 或者补齐一个非 Null ,但是又不影响业务逻辑的数值 Jingsong Li jingsongl...@gmail.com
> 于2020年6月30日周二 上午9:58写道:  Hi,   我记得NULL的literal是可以的,不过需要cast成确定的类型,比如 select
> CAST(null AS VARCHAR);  你试试。   Best,  Jingsong   On Tue, Jun 30, 2020 at
> 9:40 AM seeksst seek...@163.com wrote:Hi,
>  按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。   显示的NULL是不行的,你可以使用更复杂的方式进行对齐:
>  case when 1 = 2 then 1 end as 字段   1永远不可能等于2,又没有else分支,所以结果是会返回null.
>  原始邮件   发件人:naisili yuanyuanlong1...@gmail.com   收件人:
> user-zhuser...@flink.apache.org   发送时间:2020年6月30日(周二) 09:31   主题:flink
> sql能否显示地创造一列null行   由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT
> NULL , aaa, bbb, NULL   FROM () --  Best, Jingsong Lee


Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread Jingsong Li
Hi,

Welcome to try 1.11.

There is no direct doc to describe this, but I think these docs can help
you [1][2]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html

Best,
Jingsong

On Tue, Jun 30, 2020 at 10:25 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Thanks Jingsong,
>
> Is there any document or example to this?
> I will build the flink-1.11 package and have a try.
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>
> *From:* Jingsong Li 
> *Date:* 2020-06-30 10:08
> *To:* wangl...@geekplus.com.cn
> *CC:* user 
> *Subject:* Re: Flip-105 can the debezium/canal SQL sink to database
> directly?
> Hi Lei,
>
> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
>
> For Flink 1.11 new connectors, you need to define the primary key for
> jdbc_table (and also your mysql table needs to have the corresponding
> primary key) because changelog_table has the "update", "delete" records.
>
> And then, jdbc sink will:
> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to
> deal with "insert" and "update" messages.
> - delete to deal with "delete" messages.
>
> So generally speaking, with the primary key, this mysql table will be the
> same to your source database table. (table for generating changelog)
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> CREATE TABLE my_table (
>>   id BIGINT,
>>   first_name STRING,
>>   last_name STRING,
>>   email STRING
>> ) WITH (
>>  'connector'='kafka',
>>  'topic'='user_topic',
>>  'properties.bootstrap.servers'='localhost:9092',
>>  'scan.startup.mode'='earliest-offset',
>>  'format'='debezium-json'
>> );
>>
>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>  my_table;
>>
>>
>> What will happen after  i execute the insert sql statement? For the
>> update/delete message from kafka, the corresponding record will be updated
>> or deleted in the mysql_sink_table?
>>
>> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>>  my_table;
>>
>>
>> Thanks,
>>
>> Lei
>>
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: flink batch on yarn任务容错

2020-06-29 Thread Jingsong Li
Hi,

1.10后的Flink是支持单task的failover的,(需要batch shuffle和region调度)

所以容错粒度是基于单task。
批作业的Failover模型和流是不一样的。它就是基于单task,如果想要达到较好的容错,可以开更大的并行度,这样单task执行的时间会越短,failover效率也就会越高。

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:41 AM 张波 <173603...@qq.com> wrote:

> hi,zhisheng 使用stream是否可以使任务因为单个tm失败的情况下,只重启这个tm,而非重启整个任务?
> --原始邮件--
> 发件人:zhisheng 发送时间:2020年6月30日(星期二) 上午8:47
> 收件人:user-zh 主题:Re: flink batch on yarn任务容错



-- 
Best, Jingsong Lee


Re: Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn
Thanks Jingsong,

Is there any document or example to this?
I will build the flink-1.11 package and have a try.

Thanks,
Lei  



wangl...@geekplus.com.cn 
 
From: Jingsong Li
Date: 2020-06-30 10:08
To: wangl...@geekplus.com.cn
CC: user
Subject: Re: Flip-105 can the debezium/canal SQL sink to database directly?
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for 
jdbc_table (and also your mysql table needs to have the corresponding primary 
key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the same 
to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
 wrote:

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: Re: flink 高可用问题

2020-06-29 Thread LakeShen
Hi, Tony,

看了一下,你运行在 k8s 上面的任务,job id 是,那如果有多个任务,jobid
如果都是 
的话,如果都是在相同的ZK根目录,不同 k8s 任务 在 zk 上面的信息可能会有影响。目前我们这边是每个k8s 任务,在不同的 zk 路径下面。

第二点的话,你的任务是否能够正常运行起来?还是说任务正常运行起来,只是 checkpoint 有问题。

目前 k8s 上,JobManager 的高可用我们也借助 ZK,由于 JobManager 是一个 K8s Job,所以配置 K8s Job
的 restartPolicy 为 OnFailure。

这只是我的一些经验,仅供参考。

Best,
LakeShen

zhisheng  于2020年6月30日周二 上午8:51写道:

> hi,Tony
>
> 你可以把 Checkpoint 间隔时间稍微设置大一些,看起来像是作业启动的时候 Task 还没 Running,就开始执行 Checkpoint
> 了,而 Checkpoint 是要求所有的 Task 是处于 Running 状态的,所以就会丢弃掉那次
> Checkpoint,BT,就算有这个异常应该问题也不大,只要后面你的作业全启动成功了的话,则 Checkpoint 还是会成功的。
>
> Best!
>
> zhisheng
>
> Tony  于2020年6月29日周一 下午8:16写道:
>
> >
> >
> 你好,我的flink运行环境是在k8s中,我先是打开了checkpoint功能,那样是可以用的,task失败了数据还可以恢复,但job失败了就不行了,所以我又配置flink的高可用,在job的yaml文件里设置了动态属性("-Dhigh-availability=zookeeper"),这样job启动时就出现那种警告,功能也不好用了。但如果配置在flink-config文件里的话就可以,不知道为什么?而我就是想用那个动态属性的方式,谢谢大神指点。
> >
> >
> >
> >
> >
> > --
> > 发自我的网易邮箱手机智能版
> > 
> >
> >
> > - Original Message -
> > From: tison 
> > To: user-zh 
> > Sent: Mon, 22 Jun 2020 15:08:04 +0800
> > Subject: Re: flink 高可用问题
> >
> > 你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk
> > 间隔又小,就这样了。
> >
> > 如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来
> >
> > Best,
> > tison.
> >
> >
> > Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:
> >
> > > Hi
> > >
> > >
> > > 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
> > >
> > >
> > > Best,
> > > Yichao Yang
> > >
> > >
> > >
> > >
> > > --原始邮件--
> > > 发件人:"Tony" > > 发送时间:2020年6月22日(星期一) 上午10:54
> > > 收件人:"user-zh" > >
> > > 主题:flink 高可用问题
> > >
> > >
> > >
> > > 你好。
> > >
> > >
> > > 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> > > high-availability:zookeeper
> > > high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> > > high-availability.zookeeper.path.root:/flink
> > > high-availability.cluster-id:/cluster_one
> > > highavailability.storageDir:hdfs://master:9000/flink/ha
> > >
> > >
> > > 我的flink和zookeeper都是在K8s的容器中
> > > job启动出现如下问题:麻烦帮忙看一下,谢谢。
> > > 2020-06-22 02:47:43,884 INFO
> > >
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print
> to
> > > Std. Out, Filter -Query Map - Unwind - Custom Map -
> > filter
> > > - Data Transformation - Filter) (1/1) of job
> > >  is not in state RUNNING but SCHEDULED
> > > instead. Aborting checkpoint.
> >
>


Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread Leonard Xu
HI Lei,
Jingsong is wright, you need define a primary key for your sink table.
BTW, Flink use `PRIMARY KEY NOT ENFORCED` to define primary key because Flink 
doesn’t own data and only supports `NOT ENFORCED` mode, it’s a little bit 
different with the primary key  in DB which is default `ENFORCED` , both  
`ENFORCED ` and `NOT ENFORCED` are supported in SQL standard.
You can look up[1][2] for more details.

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
 


> 在 2020年6月30日,10:08,Jingsong Li  写道:
> 
> Hi Lei,
> 
> INSERT INTO  jdbc_table SELECT * FROM changelog_table;
> 
> For Flink 1.11 new connectors, you need to define the primary key for 
> jdbc_table (and also your mysql table needs to have the corresponding primary 
> key) because changelog_table has the "update", "delete" records.
> 
> And then, jdbc sink will:
> - insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to deal 
> with "insert" and "update" messages.
> - delete to deal with "delete" messages.
> 
> So generally speaking, with the primary key, this mysql table will be the 
> same to your source database table. (table for generating changelog)
> 
> Best,
> Jingsong
> 
> On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn 
>   > wrote:
> 
> CREATE TABLE my_table (
>   id BIGINT,
>   first_name STRING,
>   last_name STRING,
>   email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;
> 
> What will happen after  i execute the insert sql statement? For the 
> update/delete message from kafka, the corresponding record will be updated or 
> deleted in the mysql_sink_table?
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  
> my_table; 
> 
> Thanks,
> Lei
> 
> wangl...@geekplus.com.cn  
> 
> 
> 
> -- 
> Best, Jingsong Lee



Re: flink sql能否显示地创造一列null行

2020-06-29 Thread seeksst
@Jingsong Li 尝试了一下,cast确实可行,解锁新知识,thanks.


原始邮件
发件人:lakeshenshenleifight...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年6月30日(周二) 10:07
主题:Re: flink sql能否显示地创造一列null行


或者补齐一个非 Null ,但是又不影响业务逻辑的数值 Jingsong Li jingsongl...@gmail.com 于2020年6月30日周二 
上午9:58写道:  Hi,   我记得NULL的literal是可以的,不过需要cast成确定的类型,比如 select CAST(null AS 
VARCHAR);  你试试。   Best,  Jingsong   On Tue, Jun 30, 2020 at 9:40 AM seeksst 
seek...@163.com wrote:Hi,   按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。  
 显示的NULL是不行的,你可以使用更复杂的方式进行对齐:   case when 1 = 2 then 1 end as 字段   
1永远不可能等于2,又没有else分支,所以结果是会返回null.   原始邮件   发件人:naisili 
yuanyuanlong1...@gmail.com   收件人:user-zhuser...@flink.apache.org   
发送时间:2020年6月30日(周二) 09:31   主题:flink sql能否显示地创造一列null行   
由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL   FROM 
() --  Best, Jingsong Lee

Re: Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread Jingsong Li
Hi Lei,

INSERT INTO  jdbc_table SELECT * FROM changelog_table;

For Flink 1.11 new connectors, you need to define the primary key for
jdbc_table (and also your mysql table needs to have the corresponding
primary key) because changelog_table has the "update", "delete" records.

And then, jdbc sink will:
- insert or update using "INSERT INTO ... ON DUPLICATE KEY UPDATE ." to
deal with "insert" and "update" messages.
- delete to deal with "delete" messages.

So generally speaking, with the primary key, this mysql table will be the
same to your source database table. (table for generating changelog)

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:58 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> CREATE TABLE my_table (
>   id BIGINT,
>   first_name STRING,
>   last_name STRING,
>   email STRING
> ) WITH (
>  'connector'='kafka',
>  'topic'='user_topic',
>  'properties.bootstrap.servers'='localhost:9092',
>  'scan.startup.mode'='earliest-offset',
>  'format'='debezium-json'
> );
>
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>  my_table;
>
>
> What will happen after  i execute the insert sql statement? For the
> update/delete message from kafka, the corresponding record will be updated
> or deleted in the mysql_sink_table?
>
> INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM
>  my_table;
>
>
> Thanks,
>
> Lei
>
>
> --
> wangl...@geekplus.com.cn
>
>

-- 
Best, Jingsong Lee


Re: flink sql能否显示地创造一列null行

2020-06-29 Thread LakeShen
或者补齐一个非 Null ,但是又不影响业务逻辑的数值

Jingsong Li  于2020年6月30日周二 上午9:58写道:

> Hi,
>
> 我记得NULL的literal是可以的,不过需要cast成确定的类型,比如 select CAST(null AS VARCHAR);
> 你试试。
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 9:40 AM seeksst  wrote:
>
> > Hi,
> >
> >
> >   按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。
> >   显示的NULL是不行的,你可以使用更复杂的方式进行对齐:
> > case when 1 = 2 then 1 end as 字段
> >   1永远不可能等于2,又没有else分支,所以结果是会返回null.
> >
> >
> > 原始邮件
> > 发件人:naisili yuanyuanlong1...@gmail.com
> > 收件人:user-zhuser...@flink.apache.org
> > 发送时间:2020年6月30日(周二) 09:31
> > 主题:flink sql能否显示地创造一列null行
> >
> >
> > 由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL
> > FROM ()
>
>
>
> --
> Best, Jingsong Lee
>


Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 Thread LakeShen
哈哈,学习了一波

Jingsong Li  于2020年6月30日周二 上午9:59写道:

> > 用Row 和 Tuple 性能上会有差别吗?
>
> 理论上有细微的差别,
> 但是,基本上性能瓶颈不会在这里。。所以你应该感受不到
>
> Best,
> Jingsong
>
> On Tue, Jun 30, 2020 at 8:51 AM zhisheng  wrote:
>
> > 可以测试一下
> >
> > Tianwang Li  于2020年6月29日周一 下午8:13写道:
> >
> > > >
> > > > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> > > >
> > > 用Row 和 Tuple 性能上会有差别吗?
> > >
> > > Jark Wu  于2020年6月19日周五 下午3:47写道:
> > >
> > > > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> > > >
> > > >
> > > > On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
> > > >
> > > > > 感谢你的回答,请问可否举一个参照例子?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > > > > wangweigu...@stevegame.cn> 写道:
> > > > > >
> > > > > >   多个值组合在一起,当一个复合值使用!
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >发件人: 魏旭斌
> > > > > >发送时间: 2020-06-19 15:01
> > > > > >收件人: user-zh
> > > > > >主题: 关于拓展 Tuple元组的问题
> > > > > >目前Flink 提供了Tuple1 ~
> > Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > > > > 请问有什么解决的方案? 谢谢
> > > > >
> > > >
> > >
> > >
> > > --
> > > **
> > >  tivanli
> > > **
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 Thread Jingsong Li
> 用Row 和 Tuple 性能上会有差别吗?

理论上有细微的差别,
但是,基本上性能瓶颈不会在这里。。所以你应该感受不到

Best,
Jingsong

On Tue, Jun 30, 2020 at 8:51 AM zhisheng  wrote:

> 可以测试一下
>
> Tianwang Li  于2020年6月29日周一 下午8:13写道:
>
> > >
> > > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> > >
> > 用Row 和 Tuple 性能上会有差别吗?
> >
> > Jark Wu  于2020年6月19日周五 下午3:47写道:
> >
> > > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> > >
> > >
> > > On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
> > >
> > > > 感谢你的回答,请问可否举一个参照例子?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > > > wangweigu...@stevegame.cn> 写道:
> > > > >
> > > > >   多个值组合在一起,当一个复合值使用!
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >发件人: 魏旭斌
> > > > >发送时间: 2020-06-19 15:01
> > > > >收件人: user-zh
> > > > >主题: 关于拓展 Tuple元组的问题
> > > > >目前Flink 提供了Tuple1 ~
> Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > > > 请问有什么解决的方案? 谢谢
> > > >
> > >
> >
> >
> > --
> > **
> >  tivanli
> > **
> >
>


-- 
Best, Jingsong Lee


Re: flink sql能否显示地创造一列null行

2020-06-29 Thread Jingsong Li
Hi,

我记得NULL的literal是可以的,不过需要cast成确定的类型,比如 select CAST(null AS VARCHAR);
你试试。

Best,
Jingsong

On Tue, Jun 30, 2020 at 9:40 AM seeksst  wrote:

> Hi,
>
>
>   按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。
>   显示的NULL是不行的,你可以使用更复杂的方式进行对齐:
> case when 1 = 2 then 1 end as 字段
>   1永远不可能等于2,又没有else分支,所以结果是会返回null.
>
>
> 原始邮件
> 发件人:naisili yuanyuanlong1...@gmail.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年6月30日(周二) 09:31
> 主题:flink sql能否显示地创造一列null行
>
>
> 由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL
> FROM ()



-- 
Best, Jingsong Lee


Flip-105 can the debezium/canal SQL sink to database directly?

2020-06-29 Thread wangl...@geekplus.com.cn

CREATE TABLE my_table (
  id BIGINT,
  first_name STRING,
  last_name STRING,
  email STRING
) WITH (
 'connector'='kafka',
 'topic'='user_topic',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='earliest-offset',
 'format'='debezium-json'
);
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table;

What will happen after  i execute the insert sql statement? For the 
update/delete message from kafka, the corresponding record will be updated or 
deleted in the mysql_sink_table?
INSERT INTO  mysql_sink_table SELECT id, first_name, last_name FROM  my_table; 

Thanks,
Lei



wangl...@geekplus.com.cn 



Re:【Flink的transformations】

2020-06-29 Thread Roc Marshal
忝忝向仧,你好。
目前Flink文档层面没有类似的映射表归档。
但是在API层面可以观察到返回信息。


Best,
Roc Marshal



在 2020-06-29 22:29:21,"忝忝向仧" <153488...@qq.com> 写道:
>Hi,all:
>
>
>请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类.
>
>
>谢谢.


?????? flink batch on yarn????????

2020-06-29 Thread ????
hi??zhisheng 
stream??tmtm??? 
----
??:zhisheng

回复:flink sql能否显示地创造一列null行

2020-06-29 Thread seeksst
Hi,


  按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。
  显示的NULL是不行的,你可以使用更复杂的方式进行对齐:
case when 1 = 2 then 1 end as 字段
  1永远不可能等于2,又没有else分支,所以结果是会返回null.


原始邮件
发件人:naisili yuanyuanlong1...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年6月30日(周二) 09:31
主题:flink sql能否显示地创造一列null行


由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL FROM ()

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-29 Thread Tianwang Li
>
> 偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)

找到原因了, 任务处理延迟比较大,kafka数据过期清理了,导致从last消费(watermark一下子增长了好多个小时),
然后,这个时候需要输出几个小时内的所有窗口(平时一次只输出一个窗口,这时一次要输出30个窗口消耗比较长时间)。

因为是稳定测试任务,没有关注kafka 延迟 导致数据过期到问题。

感谢,zhisheng、LakeShen、Yichao Yang。


Yichao Yang <1048262...@qq.com> 于2020年6月29日周一 下午7:58写道:

> Hi
>
>
>
> 如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Tianwang Li" 发送时间:2020年6月28日(星期天) 上午10:17
> 收件人:"user-zh"
> 主题:Flink-1.10.0 source的checkpoint偶尔时间比较长
>
>
>
> 关于Flink checkpoint偶尔会比较长时间的问题。
> 环境与背景:
> 版本:flink1.10.0
> 数据量:每秒约10万左右的记录,数据源是kafka
> 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
>
>
> 问题:
>  
> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> source的checkpoint消耗的时间比较长。Triggercheckpoint 到Starting
> checkpoint消耗时间比较长。
>
>
> checkpoint情况大致如下:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 2020-06-24 21:09:53,369 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor  
>- Trigger checkpoint 316@1593004193363 for
> 84dce1ec8aa5a4df2d1758d6e9278693.
>
> 2020-06-24 21:09:58,327 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor  
>- Received heartbeat request from
> e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:09:59,266 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor  
>- Received heartbeat request from
> b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424
> MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
> Memory: 583911424
>
> 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:10:08,346 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor  
>- Received heartbeat request from
> e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:10:09,286 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor  
>- Received heartbeat request from
> b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424
> MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
> Memory: 583911424
>
> 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
>
>
> 省略
>
>
>
>
> 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
> Memory: 583911424
>
> 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:39,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  
>  - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask   
>   - Starting checkpoint (316) CHECKPOINT on task Source: Custom
> Source - Map - Filter - Timestamps/Watermarks (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy  
>  - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://chk-316,
> sharedStateDirectory=hdfs://shared,
> taskOwnedStateDirectory=hdfs://taskowned,
> metadataFilePath=hdfs://chk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> 

flink sql能否显示地创造一列null行

2020-06-29 Thread naisili Yuan
由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下:
UNION
SELECT NULL
,
aaa,
bbb,
NULL
FROM ()


Re: flinksql流计算任务非正常结束

2020-06-29 Thread zhisheng
是不是作业是一个批作业呀?

Yichao Yang <1048262...@qq.com> 于2020年6月29日周一 下午6:58写道:

> Hi
>
>
> 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"MuChen"<9329...@qq.com;
> 发送时间:2020年6月29日(星期一) 下午4:53
> 收件人:"user-zh"
> 主题:flinksql流计算任务非正常结束
>
>
>
> hi,大家好:
>
> 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm
> fsql-clinbsp; 2gt;amp;1 amp;
>
> 然后通过sql-client,提交了一个sql:
>
> 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。nbsp;
>
> 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
> https://s1.ax1x.com/2020/06/29/Nf2dIA.png
>
> 日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
> 2020-06-29 14:53:20,260 INFO
> org.apache.flink.api.common.io.LocatableInputSplitAssigner
> - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
> 14:53:22,845 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
> PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
> 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29
> 15:34:52,982 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
> - Shutting YarnSessionClusterEntrypoint down with application status
> SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - Removing cache directory
> /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29
> 15:34:53,073 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
> 15:34:53,074 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> - Shut down complete. 2020-06-29 15:34:53,074 INFO
> org.apache.flink.yarn.YarnResourceManager
> - Shut down cluster because application is in SUCCEEDED, diagnostics null.
> 2020-06-29 15:34:53,076 INFO
> org.apache.flink.yarn.YarnResourceManager
> - Unregister application from the YARN Resource Manager with final status
> SUCCEEDED. 2020-06-29 15:34:53,088 INFO
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
> - Waiting for application to be successfully unregistered. 2020-06-29
> 15:34:53,306 INFO
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
> - Closing components. 2020-06-29 15:34:53,308 INFO
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
> - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
> INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> - Stopping dispatcher 
> akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
> 2020-06-29 15:34:53,310 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> - Stopping all currently running jobs of dispatcher
> akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29
> 15:34:53,311 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> - Stopping the JobMaster for job default: insert into
> rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl -
> Interrupted while waiting for queue
> java.lang.InterruptedException
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
> at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
> 2020-06-29 15:34:53,324 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy
> - Opening proxy : uhadoop-op3raf-core12:2
>
> 
> ps:nbsp;
>
> 1. kafka中一直有数据在写入的
> 2. flink版本1.10.0
> 请问,任务状态为什么会变为SUCCEEDED呢?
>
> 谢谢大家!
>
>
>
>
> 逻辑稍微有些复杂,可以忽略下面的sql代码:
> # -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 --
> 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test
> select begin_time, vid, vid_group, max(dv),
> max(click), max(vv), max(effectivevv) from(
> select t1.begin_time begin_time, t1.u_vid
> vid, t1.u_vid_group vid_group, dv,
> click, vv, if(effectivevv is null,0,effectivevv)
> effectivevv from ( -- dv、click、vv
> select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE)
> AS STRING) begin_time, cast(u_vid as bigint)
> u_vid, u_vid_group,
> sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and
> u_c_module='M011',1,0)) dv,
> sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and
> u_c_module='M011',1,0)) click,
> sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0))
> vv FROM rt_ods.ods_applog_vidsplit where u_vid is
> not null and trim(u_vid) not null and trim(u_vid_group) not in ('','-1') and
> ( 

Re: flink读取kafka超时问题

2020-06-29 Thread zhisheng
hi,阿华田

你可以检查一下作业重启的时候,对应 Kafka 集群的 broker
机器上面的监控信息,看看对应时间是否有负载变高的情况,从而验证一下是否因为机器负载变高导致的读取超时?

Best!
zhisheng

Yichao Yang <1048262...@qq.com> 于2020年6月29日周一 下午7:50写道:

> Hi
>
>
> 看报错是说 dercd_seeme-3 partition 读取异常,可以检查下上游kafka的该partition是否有异常。
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"阿华田" 发送时间:2020年6月29日(星期一) 上午10:36
> 收件人:"user-zh"
> 主题:flink读取kafka超时问题
>
>
>
> Caused by: java.lang.Exception:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
> before the position for partition dercd_seeme-3 could be determined
> 大佬们flink读取kafka遇到过这个错误没?现在情况是
> 每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker造成了很大的负载导致请求超时?
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制


Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 Thread zhisheng
可以测试一下

Tianwang Li  于2020年6月29日周一 下午8:13写道:

> >
> > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> >
> 用Row 和 Tuple 性能上会有差别吗?
>
> Jark Wu  于2020年6月19日周五 下午3:47写道:
>
> > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
> >
> >
> > On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
> >
> > > 感谢你的回答,请问可否举一个参照例子?
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > > wangweigu...@stevegame.cn> 写道:
> > > >
> > > >   多个值组合在一起,当一个复合值使用!
> > > >
> > > >
> > > >
> > > >
> > > >发件人: 魏旭斌
> > > >发送时间: 2020-06-19 15:01
> > > >收件人: user-zh
> > > >主题: 关于拓展 Tuple元组的问题
> > > >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > > 请问有什么解决的方案? 谢谢
> > >
> >
>
>
> --
> **
>  tivanli
> **
>


Re: Re: flink 高可用问题

2020-06-29 Thread zhisheng
hi,Tony

你可以把 Checkpoint 间隔时间稍微设置大一些,看起来像是作业启动的时候 Task 还没 Running,就开始执行 Checkpoint
了,而 Checkpoint 是要求所有的 Task 是处于 Running 状态的,所以就会丢弃掉那次
Checkpoint,BT,就算有这个异常应该问题也不大,只要后面你的作业全启动成功了的话,则 Checkpoint 还是会成功的。

Best!

zhisheng

Tony  于2020年6月29日周一 下午8:16写道:

>
> 你好,我的flink运行环境是在k8s中,我先是打开了checkpoint功能,那样是可以用的,task失败了数据还可以恢复,但job失败了就不行了,所以我又配置flink的高可用,在job的yaml文件里设置了动态属性("-Dhigh-availability=zookeeper"),这样job启动时就出现那种警告,功能也不好用了。但如果配置在flink-config文件里的话就可以,不知道为什么?而我就是想用那个动态属性的方式,谢谢大神指点。
>
>
>
>
>
> --
> 发自我的网易邮箱手机智能版
> 
>
>
> - Original Message -
> From: tison 
> To: user-zh 
> Sent: Mon, 22 Jun 2020 15:08:04 +0800
> Subject: Re: flink 高可用问题
>
> 你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk
> 间隔又小,就这样了。
>
> 如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来
>
> Best,
> tison.
>
>
> Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:
>
> > Hi
> >
> >
> > 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:"Tony" > 发送时间:2020年6月22日(星期一) 上午10:54
> > 收件人:"user-zh" >
> > 主题:flink 高可用问题
> >
> >
> >
> > 你好。
> >
> >
> > 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> > high-availability:zookeeper
> > high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> > high-availability.zookeeper.path.root:/flink
> > high-availability.cluster-id:/cluster_one
> > highavailability.storageDir:hdfs://master:9000/flink/ha
> >
> >
> > 我的flink和zookeeper都是在K8s的容器中
> > job启动出现如下问题:麻烦帮忙看一下,谢谢。
> > 2020-06-22 02:47:43,884 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print to
> > Std. Out, Filter -Query Map - Unwind - Custom Map -
> filter
> > - Data Transformation - Filter) (1/1) of job
> >  is not in state RUNNING but SCHEDULED
> > instead. Aborting checkpoint.
>


Re: flink batch on yarn任务容错

2020-06-29 Thread zhisheng
hi,张波,

使用 Checkpoint 的方式在遇到错误的时候会 failover,恢复的时候是从上一次完整 Checkpoint
的状态开始恢复,不会让你重新从最开始的数据开始读取计算。

Best !

zhisheng

张波 <173603...@qq.com> 于2020年6月29日周一 下午10:06写道:

> 场景如下:
> flink批处理中,如果出现错误,包括网络及其他原因,导致任务失败,此时会将整个任务重新跑一遍,就算只是其中一个tm出现了问题也是如此。
> 我有一个sink
> es的操作,由于数据量大,将其分拆成一个独立的batch任务,但是只要中间有导致tm挂掉的错误(非任务本身逻辑问题),任务就会从头执行,感觉非常不友好。
> 问题:是否可以用streamsink的方式,使用checkpoint来解决批处理整个重启的问题?或者在10甚至之后的版本有新的解决方式?


Re: 【Flink的transformations】

2020-06-29 Thread zhisheng
应该看名字就可以看出来对应关系的

忝忝向仧 <153488...@qq.com> 于2020年6月29日周一 下午10:29写道:

> Hi,all:
>
>
>
> 请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类.
>
>
> 谢谢.


Avro from avrohugger still invalid

2020-06-29 Thread Georg Heiler
Older versions of flink were incompatible with the Scala specific record
classes generated from AvroHugger.

https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10 apparently is
fixing this. I am currently using 1.10.1. However, still experience thus
problem
https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
of:

AvroRuntimeException: Not a Specific class

What is still wrong here?

Best,

Georg


??Flink??transformations??

2020-06-29 Thread ????????
Hi,all:


Flinktransformations??org.apache.flink.streaming.api.transformations17??Transformation??(SourceTransformation,SplitTransformation,TwoInputTransformation??)??(map,flatmap,filter,connect,select??)??Transformation??.


.

?????? Blink

2020-06-29 Thread ????
??17610775726??
"org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
"org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
"org.apache.flink" % "flink-table" % "1.10.1" % "provided",
"flink-table"??
----
??:"17610775726"<17610775...@163.com;
:2020??6??29??(??) 10:09
??:"user-zh"

Re: Blink

2020-06-29 Thread 17610775726
使用row number设置成blink的planner就行了 依赖也只用加blink的




| |
17610775726
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 06/29/2020 17:19, xuhaiLong wrote:


hello,请教下


 "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
 "org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
 "org.apache.flink" % "flink-table" % "1.10.1" % "provided",


我在项目中添加了这三个依赖,在idea 中 运行的时候出现异常
`Could not instantiate the executor. Makesure a planner module is on the 
classpath`


而我添加上这个依赖
`"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",` 就可以了


但是我要使用 blink 中的ROW_NUMBER() 函数,是我的引入错了吗?


猜测是我没有正确引入 blink  ?

flink batch on yarn任务容错

2020-06-29 Thread 张波
场景如下:
flink批处理中,如果出现错误,包括网络及其他原因,导致任务失败,此时会将整个任务重新跑一遍,就算只是其中一个tm出现了问题也是如此。
我有一个sink 
es的操作,由于数据量大,将其分拆成一个独立的batch任务,但是只要中间有导致tm挂掉的错误(非任务本身逻辑问题),任务就会从头执行,感觉非常不友好。
问题:是否可以用streamsink的方式,使用checkpoint来解决批处理整个重启的问题?或者在10甚至之后的版本有新的解决方式?

Reading and updating rule-sets from a file

2020-06-29 Thread Lorenzo Nicora
Hi

My streaming job uses a set of rules to process records from a stream.
The rule set is defined in simple flat files, one rule per line.
The rule set can change from time to time. A user will upload a new file
that must replace the old rule set completely.

My problem is with reading and updating the rule set when I have a new one.
I cannot update single rules. I need the whole rule set to validate it and
build the internal representation to broadcast.

I am reading the file with a *ContinuousFileReaderOperator* and
*InputFormat* (via env.readFile(...) and creating the internal
representation of the rule set I then broadcast. I get new files with
processingMode = PROCESS_CONTINUOUSLY

How do I know when I have read ALL the records from a physical file, to
trigger validating and building the new Rule Set?

I've been thinking about a processing-time trigger, waiting a reasonable
time after I read the first rule of a new file, but it does not look safe
if the user, for example, uploads two new files by mistake.

Cheers
Lorenzo


Announcing ApacheCon @Home 2020

2020-06-29 Thread Rich Bowen

Hi, Apache enthusiast!

(You’re receiving this because you’re subscribed to one or more dev or 
user mailing lists for an Apache Software Foundation project.)


The ApacheCon Planners and the Apache Software Foundation are pleased to 
announce that ApacheCon @Home will be held online, September 29th 
through October 1st, 2020. We’ll be featuring content from dozens of our 
projects, as well as content about community, how Apache works, business 
models around Apache software, the legal aspects of open source, and 
many other topics.


Full details about the event, and registration, is available at 
https://apachecon.com/acah2020


Due to the confusion around how and where this event was going to be 
held, and in order to open up to presenters from around the world who 
may previously have been unable or unwilling to travel, we’ve reopened 
the Call For Presentations until July 13th. Submit your talks today at 
https://acna2020.jamhosted.net/


We hope to see you at the event!
Rich Bowen, VP Conferences, The Apache Software Foundation


Re: Flink Kafka connector in Python

2020-06-29 Thread Xingbo Huang
Hi Manas,

Since Flink 1.9, the entire architecture of PyFlink has been redesigned. So
the method described in the link won't work.
But you can use more convenient DDL[1] or descriptor[2] to read kafka data.
Besides, You can refer to the common questions about PyFlink[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#run-a-create-statement
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/python/common_questions.html

Best,
Xingbo

Manas Kale  于2020年6月29日周一 下午8:10写道:

> Hi,
> I want to consume and write to Kafak from Flink's python API.
>
> The only way I found to do this was through this
> 
>  question
> on SO where the user essentially copies FlinkKafka connector JARs into the
> Flink runtime's lib/ directory.
>
>- Is this the recommended method to do this? If not, what is?
>- Is there any official documentation for using Kafka with pyFlink? Is
>this officially supported?
>- How does the method described in the link work? Does the Flink
>runtime load and expose all JARs in /lib to the python script? Can I write
>custom operators in Java and use those through python?
>
> Thanks,
> Manas
>


Re:Re: flink 高可用问题

2020-06-29 Thread Tony
你好,我的flink运行环境是在k8s中,我先是打开了checkpoint功能,那样是可以用的,task失败了数据还可以恢复,但job失败了就不行了,所以我又配置flink的高可用,在job的yaml文件里设置了动态属性("-Dhigh-availability=zookeeper"),这样job启动时就出现那种警告,功能也不好用了。但如果配置在flink-config文件里的话就可以,不知道为什么?而我就是想用那个动态属性的方式,谢谢大神指点。





--
发自我的网易邮箱手机智能版



- Original Message -
From: tison 
To: user-zh 
Sent: Mon, 22 Jun 2020 15:08:04 +0800
Subject: Re: flink 高可用问题

你看一下你的 chk 间隔,看起来是作业还没调度起来就开始 chk 所以失败。可能原因资源不足,调度不起来或者调度得慢,你 chk 间隔又小,就这样了。

如果是一直 chk 以这个方式失败,应该看下调度的日志为啥迟迟调不起来

Best,
tison.


Yichao Yang <1048262...@qq.com> 于2020年6月22日周一 上午10:57写道:

> Hi
>
>
> 看日志应该只是INFO,而不是错误,你的job是做不了checkpoint吗?
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Tony" 发送时间:2020年6月22日(星期一) 上午10:54
> 收件人:"user-zh"
> 主题:flink 高可用问题
>
>
>
> 你好。
>
>
> 我按着官方文档配置了flink的高可用(flink-conf.yaml)如下:
> high-availability:zookeeper
> high-availability.zookeeper.quorum:master:2181 ,slave1:2181,slave2:2181
> high-availability.zookeeper.path.root:/flink
> high-availability.cluster-id:/cluster_one
> highavailability.storageDir:hdfs://master:9000/flink/ha
>
>
> 我的flink和zookeeper都是在K8s的容器中
> job启动出现如下问题:麻烦帮忙看一下,谢谢。
> 2020-06-22 02:47:43,884 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint triggering task Source:Kafka-Consumer - (Sink: Print to
> Std. Out, Filter -Query Map - Unwind - Custom Map - filter
> - Data Transformation - Filter) (1/1) of job
>  is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.


Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 Thread Tianwang Li
>
> 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
用Row 和 Tuple 性能上会有差别吗?

Jark Wu  于2020年6月19日周五 下午3:47写道:

> 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
>
> On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
>
> > 感谢你的回答,请问可否举一个参照例子?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
> > wangweigu...@stevegame.cn> 写道:
> > >
> > >   多个值组合在一起,当一个复合值使用!
> > >
> > >
> > >
> > >
> > >发件人: 魏旭斌
> > >发送时间: 2020-06-19 15:01
> > >收件人: user-zh
> > >主题: 关于拓展 Tuple元组的问题
> > >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
> > 请问有什么解决的方案? 谢谢
> >
>


-- 
**
 tivanli
**


Flink Kafka connector in Python

2020-06-29 Thread Manas Kale
Hi,
I want to consume and write to Kafak from Flink's python API.

The only way I found to do this was through this

question
on SO where the user essentially copies FlinkKafka connector JARs into the
Flink runtime's lib/ directory.

   - Is this the recommended method to do this? If not, what is?
   - Is there any official documentation for using Kafka with pyFlink? Is
   this officially supported?
   - How does the method described in the link work? Does the Flink runtime
   load and expose all JARs in /lib to the python script? Can I write custom
   operators in Java and use those through python?

Thanks,
Manas


??????Flink-1.10.0 source??checkpoint??????????????

2020-06-29 Thread Yichao Yang
Hi


checkpoint??cpugc??


Best,
Yichao Yang




----
??:"Tianwang Li"

??????flink????kafka????????

2020-06-29 Thread Yichao Yang
Hi


?? dercd_seeme-3 partition 
kafkapartition


Best,
Yichao Yang




----
??:"??"

[no subject]

2020-06-29 Thread Georg Heiler
Hi,

I try to use the confluent schema registry in an interactive Flink Scala
shell.

My problem is trying to initialize the serializer from the
ConfluentRegistryAvroDeserializationSchema fails:

```scala
val serializer =
ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet],
schemaRegistryUrl)
error: type arguments [Tweet] conform to the bounds of none of the
overloaded alternatives of
value forSpecific: [T <: org.apache.avro.specific.SpecificRecord](x$1:
Class[T], x$2: String, x$3:
Int)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]
 [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T],
x$2: 
String)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]
```

please see
https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
for details how the shell was set up and which additional JARs were loaded

Best,
Georg


??????flinksql????????????????????

2020-06-29 Thread Yichao Yang
Hi


??hive table


Best,
Yichao Yang




----
??:"MuChen"<9329...@qq.com;
:2020??6??29??(??) 4:53
??:"user-zh"https://s1.ax1x.com/2020/06/29/Nf2dIA.png

INFO15:34??
2020-06-29 14:53:20,260 INFO 
org.apache.flink.api.common.io.LocatableInputSplitAssigner - 
Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 
INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph
 - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, 
PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 
9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 
15:34:52,982 INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
 - Shutting YarnSessionClusterEntrypoint down with application status 
SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - 
Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - 
Removing cache directory 
/tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 
15:34:53,073 INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - 
http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 
INFO 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - 
Shut down complete. 2020-06-29 15:34:53,074 INFO 
org.apache.flink.yarn.YarnResourceManager
 - Shut down cluster because application is in SUCCEEDED, diagnostics null. 
2020-06-29 15:34:53,076 INFO 
org.apache.flink.yarn.YarnResourceManager
 - Unregister application from the YARN Resource Manager with final status 
SUCCEEDED. 2020-06-29 15:34:53,088 INFO 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
 - Waiting for application to be successfully unregistered. 2020-06-29 
15:34:53,306 INFO 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 - Closing components. 2020-06-29 15:34:53,308 INFO 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess 
- Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher
 - Stopping dispatcher 
akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 
15:34:53,310 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher
 - Stopping all currently running jobs of dispatcher 
akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 
15:34:53,311 INFO 
org.apache.flink.runtime.jobmaster.JobMaster
 - Stopping the JobMaster for job default: insert into 
rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl - 
Interrupted while waiting for queue 
java.lang.InterruptedException 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
 at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 2020-06-29 15:34:53,324 INFO 
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - 
Opening proxy : uhadoop-op3raf-core12:2 


ps:nbsp;

1. kafka
2. flink1.10.0
??SUCCEEDED

??




sql??
# -- 
??5??vid??vid_group 
-- ??55mysql insert into 
rt_app.app_video_cover_abtest_test select begin_time, 
vid, vid_group, max(dv), max(click), max(vv), 
max(effectivevv) from( select t1.begin_time 
begin_time, t1.u_vid vid, t1.u_vid_group 
vid_group, dv, click, vv, 
if(effectivevv is null,0,effectivevv) effectivevv from 
( -- dv??click??vv select 
CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) 
begin_time, cast(u_vid as bigint) u_vid, 
u_vid_group, 
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and 
u_c_module='M011',1,0)) dv, 
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and 
u_c_module='M011',1,0)) click, 
sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) 
vv FROM rt_ods.ods_applog_vidsplit where u_vid is not 
null and trim(u_vid)

Re: Timeout when using RockDB to handle large state in a stream app

2020-06-29 Thread Ori Popowski
Hi there,

I'm currently experiencing the exact same issue.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html

I've found out that GC is causing the problem, but I still haven't managed
to solve this.



On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I am trying to run a stream application with large state in a
> standalone flink cluster [3]. I configured the RocksDB state backend
> and I increased the memory of the Job Manager and Task Manager.
> However, I am still getting the timeout message
> "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
> id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
> 1.10.1 and here are the configurations that I changed on the
> flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
> filesystem. I am not sure if I need to use HDFS here since I am
> testing only in one machine.
>
> jobmanager.heap.size: 12g
> taskmanager.memory.process.size: 8g
> state.backend: rocksdb
> state.checkpoints.dir: file:///tmp/flink/state
>
> In the stream application I am using RocksDB as well (full code [3]):
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state",
> true));
>
> I have some operators that hold a large state when the load a static
> table on their state. I use them in two aggregate operations [1] and
> [2].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java
>
> Here is my stack trace error:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
> at
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
> at
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
> at
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
> at
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
> at
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
> 

Timeout when using RockDB to handle large state in a stream app

2020-06-29 Thread Felipe Gutierrez
Hi community,

I am trying to run a stream application with large state in a
standalone flink cluster [3]. I configured the RocksDB state backend
and I increased the memory of the Job Manager and Task Manager.
However, I am still getting the timeout message
"java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink
1.10.1 and here are the configurations that I changed on the
flink-conf.yaml. For the "state.checkpoints.dir" I am still using the
filesystem. I am not sure if I need to use HDFS here since I am
testing only in one machine.

jobmanager.heap.size: 12g
taskmanager.memory.process.size: 8g
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink/state

In the stream application I am using RocksDB as well (full code [3]):
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true));

I have some operators that hold a large state when the load a static
table on their state. I use them in two aggregate operations [1] and
[2].

[1] 
https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128
[2] 
https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199
[3] 
https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java

Here is my stack trace error:

org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220)
at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165)
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732)
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537)
at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149)
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818)
at 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777)
at 
org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429)
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

回复: 如何快速定位拖慢速度的 operator

2020-06-29 Thread aven . wu
如果算子都在一个group里面的话确实在webui上不好看出背压问题,可以将operator chain 拆开。

• StreamExecutionEnvironment.disableOperatorChaining():关闭整个Job的OperatorChain
• 
someStream.filter(...).map(...).startNewChain().map():startNewChain()是指从当前Operator[map]开始一个新的chain,即:两个map会chaining在一起而filter不会(因为startNewChain的存在使得第一次map与filter断开了chain)。
• 
someStream.map(...).disableChaining():disableChaining()是指当前Operator[map]禁用OperatorChain,即:Operator[map]会独自占用一个Task。
• 
someStream.map(...).slotSharingGroup("name"):默认情况下所有Operator的slotGroup都为default,可以通过slotSharingGroup()进行自定义,Flink会将拥有相同slotGroup名称的Operators运行在相同Slot内,不同slotGroup名称的Operators运行在其他Slot内。
希望对你有帮助

Best
Aven

发件人: 徐骁
发送时间: 2020年6月28日 10:16
主题: Re: 如何快速定位拖慢速度的 operator

好的 感谢两位我试试

Sun.Zhu <17626017...@163.com> 于2020年6月25日周四 下午11:19写道:

> 虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月25日 00:51,徐骁 写道:
> 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊
>



Blink

2020-06-29 Thread xuhaiLong


hello,请教下


  "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
  "org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
  "org.apache.flink" % "flink-table" % "1.10.1" % "provided",


我在项目中添加了这三个依赖,在idea 中 运行的时候出现异常
`Could not instantiate the executor. Makesure a planner module is on the 
classpath`


而我添加上这个依赖
`"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",` 就可以了


但是我要使用 blink 中的ROW_NUMBER() 函数,是我的引入错了吗?


猜测是我没有正确引入 blink  ?

flinksql????????????????????

2020-06-29 Thread MuChen
hi,

yarn-session??bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm 
fsql-cli 21 

sql-clientsql??

kafkahive??joinmysql

succeeded??https://s1.ax1x.com/2020/06/29/Nf2dIA.png

INFO15:34??
2020-06-29 14:53:20,260 INFO  
org.apache.flink.api.common.io.LocatableInputSplitAssigner- Assigning 
remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: 
false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched 
from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting 
YarnSessionClusterEntrypoint down with application status SUCCEEDED. 
Diagnostics null. 2020-06-29 15:34:52,984 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Shutting down 
rest endpoint. 2020-06-29 15:34:53,072 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Removing cache 
directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 
2020-06-29 15:34:53,073 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Shut down 
complete. 2020-06-29 15:34:53,074 INFO  
org.apache.flink.yarn.YarnResourceManager - Shut down 
cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 
15:34:53,076 INFO  org.apache.flink.yarn.YarnResourceManager
 - Unregister application from the YARN Resource Manager with final status 
SUCCEEDED. 2020-06-29 15:34:53,088 INFO  
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for 
application to be successfully unregistered. 2020-06-29 15:34:53,306 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
  - Closing components. 2020-06-29 15:34:53,308 INFO  
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess  - 
Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Stopping 
dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 
2020-06-29 15:34:53,310 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Stopping all 
currently running jobs of dispatcher 
akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 
15:34:53,311 INFO  org.apache.flink.runtime.jobmaster.JobMaster 
 - Stopping the JobMaster for job default: insert into 
rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO  
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Interrupted 
while waiting for queue java.lang.InterruptedException at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
 at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) 
at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
 2020-06-29 15:34:53,324 INFO  
org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - 
Opening proxy : uhadoop-op3raf-core12:2 

 
ps:

1. kafka
2. flink1.10.0
??SUCCEEDED

??




sql??
#  -- 
??5??vid??vid_group 
-- ??55mysql insert into 
rt_app.app_video_cover_abtest_test  select  begin_time,  vid,  vid_group,  
max(dv),  max(click),  max(vv),  max(effectivevv) from(  select   t1.begin_time 
begin_time,   t1.u_vid vid,   t1.u_vid_group vid_group,   dv,   click,   vv,   
if(effectivevv is null,0,effectivevv) effectivevv  from  (   -- dv??click??vv   
select CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,   
 cast(u_vid as bigint) u_vid,u_vid_group,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and 
u_c_module='M011',1,0)) dv,
sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and 
u_c_module='M011',1,0)) click,sum(if(concat(u_mod,'-',u_ac)='top-hits' and 
u_f_module='M011',1,0)) vv   FROM rt_ods.ods_applog_vidsplit   where u_vid is 
not null and trim(u_vid)<''and u_vid_group is not null and 
trim(u_vid_group) not in ('','-1')and (  (concat(u_mod,'-',u_ac) in 

回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-29 Thread 夏帅
你好,我试了一下,纯DataStream的方式是可以使用的,具体使用参考`flink-formats\flink-parquet\src\test\java\org\apache\flink\formats\parquet\avro\ParquetStreamingFileSinkITCase`

在Table转DataStream的方式中,我是先将Table转换为DataStream[Row],然后再进行转换生成DataStream[GenericRecord]
dataStream.map(x => {
  ...val fields = new util.ArrayList[Schema.Field]
  fields.add(new Schema.Field("platform", 
create(org.apache.avro.Schema.Type.STRING), "platform", null))
  fields.add(new Schema.Field("event", 
create(org.apache.avro.Schema.Type.STRING), "event", null))
  fields.add(new Schema.Field("dt", create(org.apache.avro.Schema.Type.STRING), 
"dt", null))
  val parquetSinkSchema: Schema = createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields)
  val record = new 
GenericData.Record(parquetSinkSchema).asInstanceOf[GenericRecord]
  record.put("platform", x.get(0))
  record.put("event", x.get(1))
  record.put("dt", x.get(2))
  record
})



--
发件人:yingbo yang 
发送时间:2020年6月29日(星期一) 10:04
收件人:夏帅 
抄 送:user-zh 
主 题:Re: flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

你好:
可以使用 GenericRecordAvroTypeInfo 这个类型,但是这个类型只适合于 table 中只有一个 字段的情况;否则会出现异常:
代码:
ArrayList fields = new 
ArrayList();
fields.add(new org.apache.avro.Schema.Field("id", 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "id", 
JsonProperties.NULL_VALUE));
fields.add(new org.apache.avro.Schema.Field("time", 
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING), "time", 
JsonProperties.NULL_VALUE));
org.apache.avro.Schema parquetSinkSchema = 
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink", "flink.parquet", 
true, fields);
String fileSinkPath = "./xxx.text/rs6/";


GenericRecordAvroTypeInfo genericRecordAvroTypeInfo = new 
GenericRecordAvroTypeInfo(parquetSinkSchema);
DataStream testDataStream1 = flinkTableEnv.toAppendStream(test, 
genericRecordAvroTypeInfo);

testDataStream1.print().setParallelism(1);


StreamingFileSink parquetSink = StreamingFileSink.
forBulkFormat(new Path(fileSinkPath),
ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
testDataStream1.addSink(parquetSink).setParallelism(1);
flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");

异常:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/Users/yyb/Software/localRepository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/Users/yyb/Software/localRepository/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
root
 |-- id: STRING
 |-- time: STRING

09:40:35,872 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a getter for field fields
09:40:35,874 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a setter for field fields
09:40:35,874 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - Class class org.apache.flink.types.Row cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
09:40:36,191 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a getter for field fields
09:40:36,191 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - class org.apache.flink.types.Row does not contain a setter for field fields
09:40:36,191 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
 - Class class org.apache.flink.types.Row cannot be used as a POJO type because 
not all fields are valid POJO fields, and must be processed as GenericType. 
Please read the Flink documentation on "Data Types & Serialization" for details 
of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Arity [2] 
of result [[Lorg.apache.flink.api.common.typeinfo.TypeInformation;@2149594a] 
does not match the number[1] of requested type 
[GenericRecord("{"type":"error","name":"pi","namespace":"flink.parquet","doc":"flinkParquetSink","fields":[{"name":"id","type":"string","doc":"id","default":null},{"name":"time","type":"string","doc":"time","default":null}]}")].
 at 
org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:66)
 at 
org.apache.flink.table.planner.DataStreamConversions$.getConversionMapper(DataStreamConversions.scala:135)
 at 

Re: Optimal Flink configuration for Standalone cluster.

2020-06-29 Thread Dimitris Vogiatzidakis
>
> It could really be specific to your workload. Some workload may need more
> heap memory while others may need more off-heap.
>
The main 'process' of my project creates a cross product of datasets and
then applies a function to all of them to extract some features.


> Alternatively, you can try to launch multiple TMs on one physical machine,
> to reduce the memory size of each TM process.

If I understand correctly you mean instead of 1 TM with 32 slots, I should
have 4 TMs with 8? Or else i would exceed the amount of total cores and
probably have tasks 'waiting' on other tasks to be completed.


BTW, what kind of workload are you running? Is it streaming or batch?
>
It is Batch. I have dataset of edges and try to extract features , to later
be used for link prediction.

Thank you
-Dimitris Vogiatzidakis

>

On Mon, Jun 29, 2020 at 5:07 AM Xintong Song  wrote:

> Since changing off-heap removes memory from '.task.heap.size' is there a
>> ratio that I should follow for better performance?
>>
> I don't think so. It could really be specific to your workload. Some
> workload may need more heap memory while others may need more off-heap.
>
> Also, my guess (since I am dealing with big datasets) is that the more
>> '.flink.size' I provide the better. Is that correct?
>>
> In most cases, yes. But it is also possible the other way around. Larger
> `.flink.size` usually also means larger JVM heap space, which reduces the
> frequency of GCs but increases the time cost on each GC (espeacially full
> GCs). On the other hand, if the memory is large enough, it could become the
> CPU resource rather than the memory that limits the performance. In such
> cases, increasing memory size won't give you more performance improvement
> but might introduce more GC overheads, thus harm the overall performance.
>
> In this particular cluster, since every Machine has 252 total DRAM and
>> worst case scenario 180GB is free to use, should I just say .flink.size:
>> 180g?
>>
> Not sure about this. I would suggest to avoid large task managers (say
> tens of GBs) unless absolutely necessary. Alternatively, you can try to
> launch multiple TMs on one physical machine, to reduce the memory size of
> each TM process.
>
> BTW, what kind of workload are you running? Is it streaming or batch?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <
> dimitrisvogiatzida...@gmail.com> wrote:
>
>> Hi Xintong,
>> Thank you for the quick response.
>> doing 1), without increasing  'task.off-heap.size'  does not change the
>> issue, but increasing the off-heap alone does.
>> What should the off-heap value size be? Since changing off-heap removes
>> memory from '.task.heap.size' is there a ratio that I should follow for
>> better performance?
>> Also, my guess (since I am dealing with big datasets) is that the more
>> '.flink.size' I provide the better. Is that correct? Or will it add extra
>> 'overhead' that could slow down my computations? In this particular
>> cluster, since every Machine has 252 total DRAM and worst case scenario
>> 180GB is free to use, should I just say .flink.size: 180g?
>>
>> Thank you very much and sorry if i'm asking silly questions.
>> Dimitris Vogiatzidakis
>>
>> On Sun, Jun 28, 2020 at 5:25 AM Xintong Song 
>> wrote:
>>
>>> Hi Dimitris,
>>>
>>> Regarding your questions.
>>> a) For standalone clusters, the recommended way is to use `.flink.size`
>>> rather than `.process.size`. `.process.size` includes JVM metaspace and
>>> overhead in addition to `.flink.size`, which usually do not really matter
>>> for standalone clusters.
>>> b) In case of direct OOMs, you should increase
>>> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
>>> c) Your understanding is correct. And you can also specify the absolute
>>> network memory size by setting the min and max to the same value.
>>>
>>> Here are my suggestions according to what you described.
>>>
>>>1. Since both off-heap and network memory seems insufficient, I
>>>would suggest to increase `taskmanager.memory.flink.size` to give your 
>>> task
>>>managers more memory in total.
>>>2. If 1) does not work, I would suggest not to set the total memory
>>>(means configure neither `.flink.size` nor `process.size`), but go for 
>>> the
>>>fine grained configuration where explicitly specify the individual memory
>>>components. Flink will automatically add them up to derive the total 
>>> memory.
>>>   1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>>>   you will also need to set `.task.heap.size` and `managed.size`.
>>>   2. If you don't know how many heap/managed memory to configure,
>>>   you can look for the configuration options in the beginning of the TM 
>>> logs
>>>   (`-Dkey=value`). Those are the values derived from your current
>>>   configuration.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Sat, Jun 27, 2020 at 10:56 PM 

Re: Convert sql table with field of type MULITSET to datastream with field of type java.util.Map[T, java.lang.Integer]

2020-06-29 Thread Timo Walther

Hi YI,

not all conversion might be supported in the `toRetractStream` method. 
Unfortunately, the rework of the type system is still in progress. I 
hope we can improve the user experience there quite soon.


Have you tried to use `Row` instead? `toRetractStream[Row]` should work 
for all data types. A subsequent conversion MapFunction can then 
transform the data in your desired representation.


Regards,
Timo


On 28.06.20 09:37, YI wrote:

Hi, all

I am trying to do something like this
```
tEnv
   .sqlQuery("SELECT rawEvent.id, collect(rawEvent.name) FROM rawEvent 
GROUP BY rawEvent.id")

   .toRetractStream[(Long, java.util.Map[String, java.lang.Integer])]
```

An exception is thrown when I ran the above code with the default 
planner setting in 1.10.1. I presume I am using the older planner.


```
Exception in thread "main" org.apache.flink.table.api.TableException: 
Result field does not match requested type. Requested: 
GenericType; Actual: Multiset
at 
org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2(Conversions.scala:104)
at 
org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2$adapted(Conversions.scala:98)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at 
org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
at 
org.apache.flink.table.planner.DataStreamConversions$.getConversionMapperWithChanges(DataStreamConversions.scala:184)
at 
org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:90)
at 
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:413)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)

at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127)
at 
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)

at io.redacted.sub.package$.getMatchesWithParent(package.scala:244)
at io.redacted.sub.package$.process(package.scala:156)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

The result type of aggregation function collect is multiset. How do I 
convert it to a `java.util.Map[String, java.lang.Integer]`?


Cheers,
YI




Re: Error reporting for Flink jobs

2020-06-29 Thread Timo Walther

Hi Satyam,

I'm not aware of an API to solve all your problems at once. A common 
pattern for failures in user-code is to catch errors in user-code and 
define a side output for an operator to pipe the errors to dedicated 
sinks. However, such a functionality does not exist in SQL yet. For the 
sink part, it might be useful to look into the StreamingFileSink [1] 
which provides better failure handling guarantees. Flink 1.11 will be 
shipped with a SQL streaming file sink.


Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html



On 28.06.20 12:27, Satyam Shekhar wrote:

Hello,

I am using Flink as the query engine for running SQL queries on both 
batch and streaming data. I use the Blink planner in batch and streaming 
mode respectively for the two cases.


In my current setup, I execute the batch queries synchronously via 
StreamTableEnvironment::execute method. The job uses OutputFormat to 
consume results in StreamTableSink and send it to the user. In case 
there is an error/exception in the pipeline (possibly to user code), it 
is not reported to OutputFormat or the Sink. If an error occurs after 
the invocation of the write method on OutputFormat, the implementation 
may falsely assume that the result successful and complete since close 
is called in both success and failure cases. I can work around this, by 
checking for exceptions thrown by the execute method but that adds extra 
latency due to job tear down cost.


A similar problem also exists for streaming jobs. In my setup, streaming 
jobs are executed asynchronously via 
StreamExecuteEnvironment::executeAsync. Since the sink interface has no 
methods to receive errors in the pipeline, the user code has to 
periodically track and manage persistent failures.


Have I missed something in the API? Or Is there some other way to get 
access to error status in user code?


Regards,
Satyam