回复: flink 1.11运算结果存mysql出错

2020-07-13 Thread Zhonghan Tang
有新数据进来吗,看起来和这个jira很像
https://issues.apache.org/jira/browse/FLINK-15262




在2020年07月13日 20:38,Leonard Xu 写道:
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka 
topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python 
*.py执行的。完整代码如下


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
id VARCHAR, 
alarm_id VARCHAR, 
trck_id VARCHAR


) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g', 
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json'
)
"""

sink="""
CREATE TABLE g_source_tab (
id VARCHAR, 
alarm_id VARCHAR,  
trck_id VARCHAR


) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g', 
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
.select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")


Re:mysql sink connection timeout

2020-06-29 Thread Zhonghan Tang
可以参考这个jira
https://issues.apache.org/jira/browse/FLINK-12494
1. Throw execption and let flink runtime handle it;
2. Handle it in OutputFormat;


| |
Zhonghan Tang
|
|
13122260...@163.com
|
签名由网易邮箱大师定制


On 06/30/2020 11:53,shizk233 wrote:
Hi All,
最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
timeout限制(默认的8小时)导致连接失效。
即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。

版本信息:
flink 1.10.1
mysql server 5.6.47
mysql Connector/J 5.1.49

请问:
1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
2.连接失效后是否有可能丢数(因为源码没找到存储record的state field)?
3.在当前版本下,连接失效有什么比较好的解决方案吗?

Thanks,
Xuhui Mao

异常信息:
2020-06-24 22:39:46,923 ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat  - JDBC
executeBatch error, retry times = 1
java.sql.SQLException: Could not retrieve transaction read-only status from
server
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:878)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:874)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3523)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3490)
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1287)
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:70)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:161)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:
Communications link failure

The last packet successfully received from the server was 10,384,059
milliseconds ago.  The last packet sent successfully to the server was
10,384,063 milliseconds ago.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:403)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3706)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2506)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2675)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2465)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2439)
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1365)
at com.mysql.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:3515)
... 13 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3688)
... 19 more


Flink/SparkStreaming 性能测试(吞吐/延时)

2020-06-11 Thread Zhonghan Tang
Hi,
近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少,  只有17年美团/15年yahoo 
做了一个类似的分析. 问题如下:
1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?


美团链接:
https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH
yahoo:
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at


| |
Zhonghan Tang
|
|
13122260...@163.com
|
签名由网易邮箱大师定制



回复:flink1.9,如何实时查看kafka消费的挤压量

2020-06-03 Thread Zhonghan Tang
一般是kafka自带的查看消费组的命令工具可以看
./kafka-consumer-groups.sh --describe --group test-consumer-group  
--bootstrap-server 


| |
Zhonghan Tang
|
|
13122260...@163.com
|
签名由网易邮箱大师定制


在2020年06月3日 14:10,guanyq 写道:
请加个问题

1.消费kafka时,是如何实时查看kafka topic的挤压量的?

????????????webui??????job

2020-05-08 Thread Zhonghan Tang
?


| |
Zhonghan Tang
|
|
13122260...@163.com
|
??


??2020??05??9?? 11:20(Jiacheng Jiang)<920334...@qq.com> ??
hi all
  flink web ui??jobweb 
ui??job??savepointweb??job??

回复:flink读取kafka数据进程卡死

2020-04-09 Thread Zhonghan Tang
打个jstack看看呢




| |
Zhonghan Tang
|
|
邮箱:13122260...@163.com
|

Signature is customized by Netease Mail Master

在2020年04月09日 17:49,Zhefu PENG 写道:
Hi all,

最近在使用flink读取kafka读取数据的时候,遇到这么个情况:

flink任务会突然在某一个时间点卡死,进程不报错,也不会fail然后退出,只会一直卡在那里也不会kafka数据,查看jobmanager和taskmanager的日志都没有报错信息,只有启动任务时候的信息或者要我设置要print的一些信息。

我的代码逻辑大概是这样:从kafka读取数据,会经过cep进行匹配,然后选取匹配命中的数据,进行聚合操作,以及窗口函数的自定义操作。

配置大概为: 2G的JobManager,2*8G的TaskManager,并行度总共为4
Kafka的数据量大概:每秒5000左右。

主要是查了很多日志没有报错信息,本身job也不会fail,这个情况让我比较挠头。请问有朋友遇到过这种情况吗?或者希望能给到更多的建议。非常谢谢

Best,
Zhefu