回复: flink 1.11运算结果存mysql出错
有新数据进来吗,看起来和这个jira很像 https://issues.apache.org/jira/browse/FLINK-15262 在2020年07月13日 20:38,Leonard Xu 写道: Hi, 简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点 Best, Leonard Xu 在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道: 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE kafka_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'alarm_test_g', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.2.2.73:2181', 'properties.bootstrap.servers' = '10.2.2.73:9092', 'format' = 'json' ) """ sink=""" CREATE TABLE g_source_tab ( id VARCHAR, alarm_id VARCHAR, trck_id VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(source) t_env.execute_sql(sink) source = t_env.from_path("kafka_source_tab")\ .select("id,alarm_id,trck_id") source.execute_insert("g_source_tab")
Re:mysql sink connection timeout
可以参考这个jira https://issues.apache.org/jira/browse/FLINK-12494 1. Throw execption and let flink runtime handle it; 2. Handle it in OutputFormat; | | Zhonghan Tang | | 13122260...@163.com | 签名由网易邮箱大师定制 On 06/30/2020 11:53,shizk233 wrote: Hi All, 最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait timeout限制(默认的8小时)导致连接失效。 即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。 版本信息: flink 1.10.1 mysql server 5.6.47 mysql Connector/J 5.1.49 请问: 1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active? 2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)? 3.在当前版本下,连接失效有什么比较好的解决方案吗? Thanks, Xuhui Mao 异常信息: 2020-06-24 22:39:46,923 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 java.sql.SQLException: Could not retrieve transaction read-only status from server at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874) at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523) at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490) at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287) at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954) at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 10,384,059 milliseconds ago. The last packet sent successfully to the server was 10,384,063 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:403) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439) at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365) at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515) ... 13 more Caused by: java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688) ... 19 more
Flink/SparkStreaming 性能测试(吞吐/延时)
Hi, 近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少, 只有17年美团/15年yahoo 做了一个类似的分析. 问题如下: 1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能? 2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能? 美团链接: https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH yahoo: https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at | | Zhonghan Tang | | 13122260...@163.com | 签名由网易邮箱大师定制
回复:flink1.9,如何实时查看kafka消费的挤压量
一般是kafka自带的查看消费组的命令工具可以看 ./kafka-consumer-groups.sh --describe --group test-consumer-group --bootstrap-server | | Zhonghan Tang | | 13122260...@163.com | 签名由网易邮箱大师定制 在2020年06月3日 14:10,guanyq 写道: 请加个问题 1.消费kafka时,是如何实时查看kafka topic的挤压量的?
????????????webui??????job
? | | Zhonghan Tang | | 13122260...@163.com | ?? ??2020??05??9?? 11:20(Jiacheng Jiang)<920334...@qq.com> ?? hi all flink web ui??jobweb ui??job??savepointweb??job??
回复:flink读取kafka数据进程卡死
打个jstack看看呢 | | Zhonghan Tang | | 邮箱:13122260...@163.com | Signature is customized by Netease Mail Master 在2020年04月09日 17:49,Zhefu PENG 写道: Hi all, 最近在使用flink读取kafka读取数据的时候,遇到这么个情况: flink任务会突然在某一个时间点卡死,进程不报错,也不会fail然后退出,只会一直卡在那里也不会kafka数据,查看jobmanager和taskmanager的日志都没有报错信息,只有启动任务时候的信息或者要我设置要print的一些信息。 我的代码逻辑大概是这样:从kafka读取数据,会经过cep进行匹配,然后选取匹配命中的数据,进行聚合操作,以及窗口函数的自定义操作。 配置大概为: 2G的JobManager,2*8G的TaskManager,并行度总共为4 Kafka的数据量大概:每秒5000左右。 主要是查了很多日志没有报错信息,本身job也不会fail,这个情况让我比较挠头。请问有朋友遇到过这种情况吗?或者希望能给到更多的建议。非常谢谢 Best, Zhefu