06)
... 4 more
在 2021-02-07 11:24:46,"Xingbo Huang" 写道:
>Hi,
>
>你可以看到报错信息的有这么一行
>Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'FULLMV' in
>'field list'
>说你的表没有FULLMV这个字段导致的
>
>Best,
>Xingbo
Statement.executeQuery(ClientPreparedStatement.java:1025)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:206)
... 4 more
在 2021-02-07 10:30:23,"肖越" <18242988...@163.com> 写道:
在写处理脚本的时候,在to_pandas这步经常会报错:java.lang.RuntimeExcept
在写处理脚本的时候,在to_pandas这步经常会报错:java.lang.RuntimeException: Failed to fetch next
result
想寻求大佬帮助,分析一下原因
sql:
'SELECT FULLMV,B_ACTL_AMT,S_ACTL_AMT,PF_ID,SYMBOL_ID
FROM TS_PF_SEC_INFO JOIN TP_GL_DAY ON BIZ_DATE = DAY_ID WHERE PF_ID =
\'103016\' AND SYMBOL_ID = \'201601\' AND CCY_TYPE =
不知道大家有没有遇到这种情况,请求大佬帮忙分析一下。
我在flink中定义了两张源表,分别对应于 Mysql 数据库中的表格,
表 a 有6934行数据;表 b 有11415574行数据;
在关联操作后,进行常规的SELECT WHERE等操作,最后查找符合条件的250条数据。
最后是print() 查找结果操作,每次单机执行都会跑10分钟!
相比于,pyflink1.11 的connector read.query()操作慢了好多,
请问pyflink1.12中是什么操作增加了执行时间,是将query这部分操作放到flink执行了么?
是否有其他的改善方式?
# 定义计算逻辑函数
@udf(input_types=DataTypes.DECIMAL(38,18,True),
result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1
调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
由于官网并未找
抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道:
>pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
>结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
>结果print报错:
>Traceback (most recent call last):
> File &quo
抱歉,报错信息理解错误,问题已经解决,感谢大佬。
在 2021-02-03 10:16:38,"肖越" <18242988...@163.com> 写道:
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
File "C:*/udtf_
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
File "C:*/udtf_test.py", line 42, in
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
FROM query
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
File "C:*/udtf_test.py", line 42, in
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE)
FROM query
通过sql_query执行返回table对象,执行table.to_pandas()报错:
Traceback (most recent call last):
File
"C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
line 161, in
print(table.to_pandas().head(6))
File
"C:\Users\18242\AppData\Local\Programs\P
目前通过 connector 定义ddl的方式,通过数据库读取数据,方式如下:
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
每次表格获取及定义都需要定义数据类型(如例子中: BI
问题已解决 ~ 数据定义的问题
在 2020-12-30 13:41:16,"肖越" <18242988...@163.com> 写道:
通过connector的方式定义数据:
数据类型定义为:yldrate DECIMAL, pf__id VARCHAR(10), symbol_id VARCHAR(30)
数据库中yldrate数据前几条是这样的:
0.72101337
0.
0.
0.000212493881
0.7871984
通过connector的方式定义数据:
数据类型定义为:yldrate DECIMAL, pf__id VARCHAR(10), symbol_id VARCHAR(30)
数据库中yldrate数据前几条是这样的:
0.72101337
0.
0.
0.000212493881
0.78719845
0.73023505
0.70173309
0.70168385
但是,pyflink读取出来转为pandas后都是:
yldrate
0 0
1 0
谢谢,老师们的指导,根据嘉伟的建议,发现pyflink1.12确实并不支持这个参数~
还是希望官方能够开放这个参数,就目前的工作情景来说,取数据就需要定义整张表,如果数据库更改,代码这边很不便于维护;
从本机的实验结果上看,pyflink内部进行query的效率并不高,正准备放到集群上试试~
在 2020-12-25 09:45:28,"Leonard Xu" 写道:
>Hi, 嘉伟
>
>1.12 应该不支持 read.query, 社区还在讨论是否要开放这个,有些concern, 简单的讲,就如你这个query写的,创建的这张JDBC
>表应该是一个 View
使用DDL 定义connector连接Mysql数据库,想通过发送sql的方式直接获取数据:
source_ddl = """
CREATE TABLE source_table(
yldrate DECIMAL,
pf_id VARCHAR,
symbol_id VARCHAR) WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip/db',
'dr
connector 从数据库读取整张表格,执行:
env.sql_query("select a , b, c from table1 left join table2 on a = d where b =
'103' and c = '203' and e = 'AC' and a between 20160701 and 20170307 order a")
其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟~
了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?
connector 从数据库读取整张表格,执行:
env.sql_query("select a , b, c from table1 left join table2 on a = d where b =
'103' and c = '203' and e = 'AC' and a between 20160701 and 20170307 order by
biz_date")
其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟!
了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?
您好,感谢您的回复。
是的,目前是通过connector定义了整张表的字段来获取的数据,
connector中是否有字段支持,设置sql语句直接获取数据库检索后返回的数据呢?
现在这种方式,如若数据库表字段变更,以后很难维护啊~
在 2020-12-23 14:36:20,"Wei Zhong" 写道:
>你好,
>
>pyflink需要通过声明jdbc connector的方式来从数据库中获取数据。
>
>> 在 2020年12月22日,17:40,肖越 <18242988...@163.com> 写道:
例如:pandas.read_sql()的用法,直接返回源数据,pyflink小白,蹲大佬的答复。
通过sql进行左连接查询,sql语句为:
sql = ''' Insert into print_sink select a.id, a.pf_id, b.symbol_id from a \
left join b on b.day_id = a.biz_date where a.ccy_type = 'AC' and \
a.pf_id = '1030100122' and b.symbol_id = '2030004042' and a.biz_date
between '20160701' and '20170307' '''
table_
在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
#DDL定义
source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\
symbol_id VARCHAR,biz_date VARCHAR,\
ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\
is_valid DECIMAL,time_mark TIMESTAMP) WITH (
由失误操作使用了企业邮箱订阅,目前无法取消订阅,向user-zh-unsubscribe发送多封邮件也无效?请问官方有解决办法么?
:238)
at java.lang.Thread.run(Thread.java:748)
在 2020-12-17 14:57:36,"Dian Fu" 写道:
>发一下完整的异常信息?
>
>> 在 2020年12月17日,上午11:53,肖越 <18242988...@163.com> 写道:
>>
>> 好的,非常感谢您的帮
最近在尝试 pyflink 功能,只查到了定义connector 的输出方法,例如:
sink_ddl = '''
CREATE TABLE print_sink (
ID DOUBLE,
NAME STRING
) WITH (
'connector' = 'print'
)
'''
每次都要事先定义好要输出的表格格式,是否有更加方便
jects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
>
><https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors
请问 可以连接oracle数据库么?自己尝试了定义connector,但报错findAndCreateTableSource failed
,并没有找到pyflink关于oracle connector的定义示例代码
在 2020-12-17 10:16:13,"Dian Fu" 写道:
>感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
>redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/
谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
在 2020-12-17 09:55:08,"Leonard Xu" 写道:
>目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>
>祝好,
>Leonard
>
>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>>
>>
pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
通过如下方式定义:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env = StreamTableEnvironment \
.create(env, environment_settings=EnvironmentSettings
.new_instance()
.use_bli
28 matches
Mail list logo