Re:Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 文章 RS
你看下INERT SQL的执行时长,看下是不是MySQL那边的瓶颈?比如写入的数据较大,索引创建比较慢等其他问题?

或者你手动模拟执行下SQL写数据对比下速度?














在 2020-07-25 10:20:35,"小学生" <201782...@qq.com> 写道:
>您好,谢谢您的解答,但是我测试了按您这个方式添加以后,每秒入mysql数据量变成了8条左右,提升还达不到需要。


Re: flink1.11??????????????????mysql????????

2020-07-24 文章 ??????
mysql8??

Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 文章 WeiXubin
Hi,
你可以尝试改写url,加上rewritebatchedstatements=true,如下:
jdbc:mysql://198.2.2.71:3306/bda?useSSL=false=true

MySQL
Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true,
驱动才会帮你批量执行SQL。

祝好
weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
hi,
感谢回复,尝试了多次之后,发现应该不是依赖包的问题


我项目中新增目录:resources/META-INF/services
然后从Flink源码中复制了2个文件 
org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。


在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道:
>hi
>只需要-sql和-json两个包就可以了
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mail Master
>
>On 07/24/2020 17:02, RS wrote:
>hi,
>Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>编译的jar包是jar-with-dependencies的
>
>
>代码片段:
>   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>   "  number BIGINT,\n" +
>   "  msg STRING,\n" +
>   "  username STRING,\n" +
>   "  update_time TIMESTAMP(3)\n" +
>   ") WITH (\n" +
>   " 'connector' = 'kafka',\n" +
>   " 'topic' = '%s',\n" +
>   " 'properties.bootstrap.servers' = '%s',\n" +
>   " 'properties.group.id' = '%s',\n" +
>   " 'format' = 'json',\n" +
>   " 'json.fail-on-missing-field' = 'false',\n" +
>   " 'json.ignore-parse-errors' = 'true'\n" +
>   ")\n", tableName, topic, servers, group);
>
>
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>   tableEnv.executeSql(ddlSql);
>
>
>报错信息:
>Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
>factory for identifier 'kafka' that implements 
>'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
>Available factory identifiers are:
>datagen
>at 
>org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>at 
>org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>... 33 more
>
>
>参考了这个 
>http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
>附上pom依赖:
>
>   
>   org.apache.flink
>   flink-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java-bridge_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-table-api-java
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-sql-connector-kafka_2.12
>   ${flink.version}
>   
>   
>   org.apache.flink
>   flink-json
>   ${flink.version}
>   
>   
>
>
>感谢各位~


Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 JasonLee
hi
只需要-sql和-json两个包就可以了


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 07/24/2020 17:02, RS wrote:
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
   public String ddlSql = String.format("CREATE TABLE %s (\n" +
   "  number BIGINT,\n" +
   "  msg STRING,\n" +
   "  username STRING,\n" +
   "  update_time TIMESTAMP(3)\n" +
   ") WITH (\n" +
   " 'connector' = 'kafka',\n" +
   " 'topic' = '%s',\n" +
   " 'properties.bootstrap.servers' = '%s',\n" +
   " 'properties.group.id' = '%s',\n" +
   " 'format' = 'json',\n" +
   " 'json.fail-on-missing-field' = 'false',\n" +
   " 'json.ignore-parse-errors' = 'true'\n" +
   ")\n", tableName, topic, servers, group);


   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
   tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:

   
   org.apache.flink
   flink-java
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java-bridge_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java
   ${flink.version}
   
   
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-json
   ${flink.version}
   
   


感谢各位~

Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 admin
  
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   

这两个会有冲突,去掉上面那个

> 2020年7月24日 下午5:02,RS  写道:
> 
>   
>org.apache.flink
>flink-connector-kafka_2.12
>${flink.version}
>
>
>org.apache.flink
>flink-sql-connector-kafka_2.12
>${flink.version}
>



Re: flink sql 读取mysql

2020-07-24 文章 admin
 'connector.properties.zookeeper.connect' = '',  -- zk 地址
   'connector.properties.bootstrap.servers' = '',  -- broker 地址

'connector.username' = '',
   'connector.password' = ‘',
这几行有问题吧

> 2020年7月24日 下午4:20,liunaihua521  写道:
> 
>  'connector.properties.zookeeper.connect' = '',  -- zk 地址
>'connector.properties.bootstrap.servers' = '',  -- broker 地址



flink1.11??????????????????mysql????????

2020-07-24 文章 ??????
flink1.11kafkamysqlkafka300??/??mysql??6??


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source_Kafka = """
CREATE TABLE kafka_source (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = '*',
'properties.group.id' = 'flink_grouper',
'scan.startup.mode' = 'earliest-offset', 
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
"""
source_W_detail_ddl = """
CREATE TABLE source_W_detail (
id VARCHAR, 
alarm_id VARCHAR,  
trck_id VARCHAR 
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://198.2.2.71:3306/bda?useSSL=false',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 'detail',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '2s'
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source_Kafka)
t_env.execute_sql(source_W_detail_ddl)
table_result1=t_env.execute_sql('''insert into source_W_detail select 
id,alarm_id,trck_id from kafka_source''')
table_result1.get_job_client().get_job_execution_result().result()

回复: Re: flink 1.11 cdc相关问题

2020-07-24 文章 amen...@163.com
多谢!已关注~


Best


amen...@163.com
 
发件人: Leonard Xu
发送时间: 2020-07-24 16:20
收件人: user-zh
主题: Re: flink 1.11 cdc相关问题
Hi amenhub
 
针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]
 
Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 

[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html 

 
> 在 2020年7月23日,09:14,amen...@163.com 写道:
>
> 感谢二位大佬@Leonard, @Jark的解答!
>
>
>
> amen...@163.com
>
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
>
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
>
> Best,
> Jark
>
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
>
>> Hello,
>>
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>>
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>
>>
>> {
>>"payload": {
>>"before": null,
>>"after": {
>>"id": 2,
>>"name": "liushimin",
>>"age": "24",
>>"sex": "man",
>>"phone": "1"
>>},
>>"source": {
>>"version": "1.2.0.Final",
>>"connector": "postgresql",
>>"name": "postgres",
>>"ts_ms": 1595409754151,
>>"snapshot": "false",
>>"db": "postgres",
>>"schema": "public",
>>"table": "person",
>>"txId": 569,
>>"lsn": 23632344,
>>"xmin": null
>>},
>>"op": "u",
>>"ts_ms": 1595409754270,
>>"transaction": null
>>}
>> }
>>
>>> 在 2020年7月22日,17:34,amen...@163.com 写道:
>>>
>>>
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>>
>>
 

Re: 关于 sql-client

2020-07-24 文章 Harold.Miao
这个呢  https://github.com/ververica/flink-sql-gateway

杨荣  于2020年7月24日周五 下午3:19写道:

> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao


Re:Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
邮件格式不对,我重新回复下


我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。

> flink run xxx

没有使用shade-plugin

maven build参数:

1.8
1.11.1










maven-compiler-plugin



${jdk.version}

${jdk.version}







org.apache.maven.plugins

maven-assembly-plugin





package



single











jar-with-dependencies














Re:Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
我这边是直接打成jar包扔到服务器上运行的(bin/flink run 
xxx),没有在IDEA运行过。maven编译没配置shade-plugin,maven build参数如下:
propertiesjdk.version1.8/jdk.version  
  flink.version1.11.1/flink.version
/propertiesbuildplugins  
  plugin
artifactIdmaven-compiler-plugin/artifactId
configuration
source${jdk.version}/source
target${jdk.version}/target
/configuration/plugin
plugin
groupIdorg.apache.maven.plugins/groupId
artifactIdmaven-assembly-plugin/artifactId
executionsexecution   
 phasepackage/phase
goals
goalsingle/goal/goals 
   /execution
/executionsconfiguration  
  descriptorRefs
descriptorRefjar-with-dependencies/descriptorRef   
 /descriptorRefs
/configuration/plugin
/plugins/buildthx
在 2020-07-24 17:36:46,"Benchao Li"  写道:
>可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
>
>如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
>如果你用的是shade plugin,需要看下这个transformer[1]
>
>[1]
>https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer
>
>RS  于2020年7月24日周五 下午5:02写道:
>
>> hi,
>> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> 编译的jar包是jar-with-dependencies的
>>
>>
>> 代码片段:
>> public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> "  number BIGINT,\n" +
>> "  msg STRING,\n" +
>> "  username STRING,\n" +
>> "  update_time TIMESTAMP(3)\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = '%s',\n" +
>> " 'properties.bootstrap.servers' = '%s',\n" +
>> " 'properties.group.id' = '%s',\n" +
>> " 'format' = 'json',\n" +
>> " 'json.fail-on-missing-field' = 'false',\n" +
>> " 'json.ignore-parse-errors' = 'true'\n" +
>> ")\n", tableName, topic, servers, group);
>>
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> tableEnv.executeSql(ddlSql);
>>
>>
>> 报错信息:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> Available factory identifiers are:
>> datagen
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 33 more
>>
>>
>> 参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>>
>>
>> 附上pom依赖:
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-sql-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-json
>> ${flink.version}
>> 
>> 
>>
>>
>> 感谢各位~
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 Benchao Li
可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
如果你用的是shade plugin,需要看下这个transformer[1]

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer

RS  于2020年7月24日周五 下午5:02写道:

> hi,
> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> 编译的jar包是jar-with-dependencies的
>
>
> 代码片段:
> public String ddlSql = String.format("CREATE TABLE %s (\n" +
> "  number BIGINT,\n" +
> "  msg STRING,\n" +
> "  username STRING,\n" +
> "  update_time TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = '%s',\n" +
> " 'properties.bootstrap.servers' = '%s',\n" +
> " 'properties.group.id' = '%s',\n" +
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")\n", tableName, topic, servers, group);
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> tableEnv.executeSql(ddlSql);
>
>
> 报错信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> Available factory identifiers are:
> datagen
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 33 more
>
>
> 参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
> 附上pom依赖:
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java-bridge_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-table-api-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-sql-connector-kafka_2.12
> ${flink.version}
> 
> 
> org.apache.flink
> flink-json
> ${flink.version}
> 
> 
>
>
> 感谢各位~



-- 

Best,
Benchao Li


Could not find any factory for identifier 'kafka'

2020-07-24 文章 RS
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
public String ddlSql = String.format("CREATE TABLE %s (\n" +
"  number BIGINT,\n" +
"  msg STRING,\n" +
"  username STRING,\n" +
"  update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = '%s',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")\n", tableName, topic, servers, group);


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-table-api-java-bridge_2.12
${flink.version}


org.apache.flink
flink-table-api-java
${flink.version}


org.apache.flink
flink-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}


org.apache.flink
flink-json
${flink.version}




感谢各位~

Re: Re: 关于 sql-client

2020-07-24 文章 chengyanan1...@foxmail.com
zeppelin 可以网页上提交各种作业,也是很不错的
另外 submit with SQL file 可以参考大佬写的 https://github.com/wuchong/flink-sql-submit,
然后在大佬的基础上,我自己稍微简化了一下,https://github.com/Chengyanan1008/flink-sql-submit-client
直接在服务器上执行./sql-submit.sh -f  就可以执行SQL 文件了




chengyanan1...@foxmail.com
 
发件人: Jeff Zhang
发送时间: 2020-07-24 15:46
收件人: user-zh
主题: Re: 关于 sql-client
可以用zeppelin来提交flink sql作业,可以加入钉钉群讨论:32803524
 
杨荣  于2020年7月24日周五 下午3:19写道:
 
> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>
 
 
-- 
Best Regards
 
Jeff Zhang


Re: flink sql 读取mysql

2020-07-24 文章 Leonard Xu
Hello

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句

祝好
Leonard Xu
> 在 2020年7月24日,16:20,liunaihua521  写道:
> 
> org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered 
> "timestamp,"at line
> Was expecting one of:
> "CURSOR"...



Re: flink 1.11 cdc相关问题

2020-07-24 文章 Leonard Xu
Hi amenhub

针对这个问题,我建了个issue来跟踪这个问题[1],
另外你可以在你的PG 里面把表的IDENTITY设置为FULL,这样 debezium 同步的UPDATE数据就会有完整的信息,
DB命令是:ALTER TABLE yourTable REPLICA IDENTITY FULL, 可以参考debezium官网文档[2]

Best
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-18700 

[2] https://debezium.io/documentation/reference/1.2/connectors/postgresql.html 


> 在 2020年7月23日,09:14,amen...@163.com 写道:
> 
> 感谢二位大佬@Leonard, @Jark的解答!
> 
> 
> 
> amen...@163.com
> 
> 发件人: Jark Wu
> 发送时间: 2020-07-22 23:56
> 收件人: user-zh
> 主题: Re: flink 1.11 cdc相关问题
> Hi,
> 
> 这是个已知问题,目前 debezium 同步不同数据库并没有保证一模一样地数据格式,比如同步 PG 的UPDATE消息时候,before 和
> after 字段就不是全的。
> 这个问题会在后面地版本中解决。
> 
> Best,
> Jark
> 
> On Wed, 22 Jul 2020 at 21:07, Leonard Xu  wrote:
> 
>> Hello,
>> 
>> 代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
>> 看起来是你的数据问题,一条 update 的changelog, before 为null,
>> 这是不合理的,没有before的数据,是无法处理after的数据的。
>> 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]
>> 
>> 祝好
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>> 
>> 
>> {
>>"payload": {
>>"before": null,
>>"after": {
>>"id": 2,
>>"name": "liushimin",
>>"age": "24",
>>"sex": "man",
>>"phone": "1"
>>},
>>"source": {
>>"version": "1.2.0.Final",
>>"connector": "postgresql",
>>"name": "postgres",
>>"ts_ms": 1595409754151,
>>"snapshot": "false",
>>"db": "postgres",
>>"schema": "public",
>>"table": "person",
>>"txId": 569,
>>"lsn": 23632344,
>>"xmin": null
>>},
>>"op": "u",
>>"ts_ms": 1595409754270,
>>"transaction": null
>>}
>> }
>> 
>>> 在 2020年7月22日,17:34,amen...@163.com 写道:
>>> 
>>> 
>> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"1"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}
>> 
>> 



回复: flink sql 读取mysql

2020-07-24 文章 liunaihua521
hi!
您好,我明白您的意思了,并且看了下网上的资料,改完后如下


DDL:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (
'connector.type' = 'kafka',  -- kafka connector
'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior',  -- kafka topic
'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
'connector.properties.zookeeper.connect' = '',  -- zk 地址
'connector.properties.bootstrap.servers' = '',  -- broker 地址
'format.type' = 'json'  -- 数据源格式为 json
);




CREATE TABLE category_info (
parent_id BIGINT, -- 商品大类
category_id BIGINT  -- 商品详细类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://:3306/flinkdemo',
'connector.table' = 'category_info',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = '',
'connector.password' = '',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);


SQL:


SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF 
U.proctime AS C
ON U.category_id = C.category_id;


但是执行SQL报错了(由于代码在办公环境粘不出来,就手打如下部分):
org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered 
"timestamp,"at line
Was expecting one of:
"CURSOR"...
"EXISTS"...
"NOT"...
"ROW"...
"("...


一直调试不好,望指教




在2020年7月24日 14:25,Leonard Xu 写道:
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1]

祝好
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 


在 2020年7月24日,14:14,liunaihua521  写道:

hi!
版本:flink  1.10
mysql 5.7.24

需求场景是:
使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.






Flink CPU利用率低

2020-07-24 文章 guaishushu1...@163.com


想问下大佬们 Flink的cpu利用率这么低吗 0.012?


guaishushu1...@163.com


Re: 关于 sql-client

2020-07-24 文章 Jeff Zhang
可以用zeppelin来提交flink sql作业,可以加入钉钉群讨论:32803524

杨荣  于2020年7月24日周五 下午3:19写道:

> 你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
> submit with sql file 的 feature 到现在都还没实现呢。
>
> Harold.Miao  于2020年7月24日周五 上午11:42写道:
>
> > 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> > 源码里面有加载主配置文件的逻辑
> >
> > public LocalExecutor(URL defaultEnv, List jars, List
> libraries) {
> >// discover configuration
> >final String flinkConfigDir;
> >try {
> >   // find the configuration directory
> >   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
> >
> >   // load the global configuration
> >   this.flinkConfig =
> > GlobalConfiguration.loadConfiguration(flinkConfigDir);
> >
> >   // initialize default file system
> >   FileSystem.initialize(flinkConfig,
> > PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
> >
> >   // load command lines for deployment
> >   this.commandLines =
> > CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
> >   this.commandLineOptions = collectCommandLineOptions(commandLines);
> >} catch (Exception e) {
> >   throw new SqlClientException("Could not load Flink configuration.",
> > e);
> >}
> >
> >
> > 2  因为等不及官方的  我们自己wrapper实现了一个
> >
> >
> >
> >
> > 杨荣  于2020年7月24日周五 上午10:53写道:
> >
> > > Hi all,
> > >
> > > 请问:
> > > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> > >
> > > 2. GateWay mode 预计在那个版本 release?
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 
Best Regards

Jeff Zhang


Re: 关于 sql-client

2020-07-24 文章 杨荣
你们可以 pr 到官方啊。我觉得这个功能很刚需啊,并且 basic 版的 1.5 就 release 了,不知道为什么相关 gate way 或者
submit with sql file 的 feature 到现在都还没实现呢。

Harold.Miao  于2020年7月24日周五 上午11:42写道:

> 1 应该是可以的   主要是你要在flink-conf.yaml里面配置正确的 jobmanager.rpc.address
> 源码里面有加载主配置文件的逻辑
>
> public LocalExecutor(URL defaultEnv, List jars, List libraries) {
>// discover configuration
>final String flinkConfigDir;
>try {
>   // find the configuration directory
>   flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
>
>   // load the global configuration
>   this.flinkConfig =
> GlobalConfiguration.loadConfiguration(flinkConfigDir);
>
>   // initialize default file system
>   FileSystem.initialize(flinkConfig,
> PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
>
>   // load command lines for deployment
>   this.commandLines =
> CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
>   this.commandLineOptions = collectCommandLineOptions(commandLines);
>} catch (Exception e) {
>   throw new SqlClientException("Could not load Flink configuration.",
> e);
>}
>
>
> 2  因为等不及官方的  我们自己wrapper实现了一个
>
>
>
>
> 杨荣  于2020年7月24日周五 上午10:53写道:
>
> > Hi all,
> >
> > 请问:
> > 1. 在 Embedded mode 下,支持 ClusterClient 进行 job
> > 提交作业,进行分布式计算吗?在文档中没看到,跟着文档走,只启起了 Local 在本地作业,无法运用到生产环境。
> >
> > 2. GateWay mode 预计在那个版本 release?
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: flink sql 读取mysql

2020-07-24 文章 Leonard Xu

另外社区中文邮件交流直接发邮件到user-zh@flink.apache.org 
就可以了,不用发user-zh-...@flink.apache.org 
 这个地址。


> 在 2020年7月24日,14:25,Leonard Xu  写道:
> 
> Hello
> 图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] 
> 
> 祝好
> Leonard Xu
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
>  
> 
>  
> 
>> 在 2020年7月24日,14:14,liunaihua521 > > 写道:
>> 
>> hi!
>> 版本:flink  1.10
>> mysql 5.7.24
>> 
>> 需求场景是:
>> 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
>> connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
>> 
>> 现在本地测试时,维表的DDL是:
>> 
>> 但是去mysql修改了数据后,join操作还是旧数据.
>> 
>> 望大神们指点方向,提前谢谢了.
>> 
>> 
>> 
> 



Re: flink sql 读取mysql

2020-07-24 文章 Leonard Xu
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] 

祝好
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
 

 

> 在 2020年7月24日,14:14,liunaihua521  写道:
> 
> hi!
> 版本:flink  1.10
> mysql 5.7.24
> 
> 需求场景是:
> 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
> connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
> 
> 现在本地测试时,维表的DDL是:
> 
> 但是去mysql修改了数据后,join操作还是旧数据.
> 
> 望大神们指点方向,提前谢谢了.
> 
> 
> 



flink sql 读取mysql

2020-07-24 文章 liunaihua521
hi!
版本:flink  1.10
mysql 5.7.24


需求场景是:
使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql 
connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?


现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.


望大神们指点方向,提前谢谢了.