Re:Re: pyflink1.12 to_pandas报错:java.lang.RuntimeException: Failed to fetch next result

2021-02-06 Thread
(AkkaRpcActor.java:286)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: open() failed.Unknown column 
'SYMBOL_CODE' in 'field list'
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:209)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'SYMBOL_CODE' in 
'field list'
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:206)
... 4 more
















在 2021-02-07 11:24:46,"Xingbo Huang"  写道:
>Hi,
>
>你可以看到报错信息的有这么一行
>Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'FULLMV' in
>'field list'
>说你的表没有FULLMV这个字段导致的
>
>Best,
>Xingbo
>
>肖越 <18242988...@163.com> 于2021年2月7日周日 上午10:43写道:
>
>> 补充一下代码信息
>> 下面是执行的语句:
>> query_table = env.sql_query(sql)
>> query_table.print_schema()
>>
>>
>> @udf(result_type=DataTypes.FLOAT(), func_type="pandas")
>> def udf_test(i):
>> i = i.astype('float')
>> return i
>>
>>
>> result = query_table.select(query_table.PF_ID, query_table.SYMBOL_ID,
>> udf_test(query_table.FULLMV))
>> print(result.to_pandas())
>> 报错信息:
>> py4j.protocol.Py4JJavaError: An error occurred while calling o86.hasNext.
>> : java.lang.RuntimeException: Failed to fetch next result
>> at
>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
>> at
>> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77)
>> at
>> org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115)
>> at
>> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
>> at
>> org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644)
>> at
>> org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at

Re:pyflink1.12 to_pandas报错:java.lang.RuntimeException: Failed to fetch next result

2021-02-06 Thread
r.java:201)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: open() failed.Unknown column 
'FULLMV' in 'field list'
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:209)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'FULLMV' in 'field 
list'
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
at 
com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.open(JdbcRowDataInputFormat.java:206)
... 4 more

在 2021-02-07 10:30:23,"肖越" <18242988...@163.com> 写道:

在写处理脚本的时候,在to_pandas这步经常会报错:java.lang.RuntimeException: Failed to fetch next 
result
想寻求大佬帮助,分析一下原因
sql: 
   'SELECT FULLMV,B_ACTL_AMT,S_ACTL_AMT,PF_ID,SYMBOL_ID 
FROM TS_PF_SEC_INFO JOIN TP_GL_DAY ON BIZ_DATE = DAY_ID WHERE PF_ID = 
\'103016\' AND SYMBOL_ID = \'201601\' AND CCY_TYPE = \'AC\' AND 
BIZ_DATE BETWEEN \'20160306\' AND \'2016\''





 

pyflink1.12 to_pandas报错:java.lang.RuntimeException: Failed to fetch next result

2021-02-06 Thread
在写处理脚本的时候,在to_pandas这步经常会报错:java.lang.RuntimeException: Failed to fetch next 
result
想寻求大佬帮助,分析一下原因
sql: 
   'SELECT FULLMV,B_ACTL_AMT,S_ACTL_AMT,PF_ID,SYMBOL_ID 
FROM TS_PF_SEC_INFO JOIN TP_GL_DAY ON BIZ_DATE = DAY_ID WHERE PF_ID = 
\'103016\' AND SYMBOL_ID = \'201601\' AND CCY_TYPE = \'AC\' AND 
BIZ_DATE BETWEEN \'20160306\' AND \'2016\''

pyflink1.12 定义源表后, 执行表关联的query效率较慢?

2021-02-03 Thread
不知道大家有没有遇到这种情况,请求大佬帮忙分析一下。


我在flink中定义了两张源表,分别对应于 Mysql 数据库中的表格,
表 a 有6934行数据;表 b 有11415574行数据;
在关联操作后,进行常规的SELECT  WHERE等操作,最后查找符合条件的250条数据。
最后是print() 查找结果操作,每次单机执行都会跑10分钟!


相比于,pyflink1.11 的connector read.query()操作慢了好多,
请问pyflink1.12中是什么操作增加了执行时间,是将query这部分操作放到flink执行了么?
是否有其他的改善方式?



测试pandas udf报错:AttributeError: 'decimal.Decimal' object has no attribute 'isnull'

2021-02-02 Thread
# 定义计算逻辑函数

@udf(input_types=DataTypes.DECIMAL(38,18,True), 
result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")

def multi_production(yldrate):

yldrate_1 = yldrate + 1

return np.prod(yldrate_1) - 1


调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result')
由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么?
【报错信息】:
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345)
at 
org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230)
... 17 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 2: Traceback (most recent call last):
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 167, in _execute
response = task()
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 223, in 
lambda: self.create_worker().do_instruction(request), request)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 352, in do_instruction
request.instruction_id)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py",
 line 386, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
 line 812, in process_bundle
data.transform_id].process_encoded(data.data)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py",
 line 205, in process_encoded
self.output(decoded_value)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
 line 304, in output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py",
 line 178, in receive
self.consumer.process(windowed_value)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py",
 line 92, in process
self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, 
True)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
 line 467, in encode_to_stream
self._value_coder.encode_to_stream(value, out, nested)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py",
 line 438, in encode_to_stream
pandas_to_arrow(self._schema, self._timezone, self._field_types, cols))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
 line 35, in pandas_to_arrow
schema.types[i]) for i in range(0, len(schema))]
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
 line 35, in 
schema.types[i]) for i in range(0, len(schema))]
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py",
 line 27, in create_array
return pa.Array.from_pandas(s, mask=s.isnull(), type=t)
AttributeError: 'decimal.Decimal' object has no attribute 'isnull'


at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 

Re:pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread
抱歉,报错信息理解错误,问题已经解决,感谢大佬。

在 2021-02-03 10:23:32,"肖越" <18242988...@163.com> 写道:
>pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
>结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
>结果print报错:
>Traceback (most recent call last):
>  File "C:*/udtf_test.py", line 42, in 
>env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
> FROM query_result')
>  File 
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
>  line 543, in execute_sql
>return TableResult(self._j_tenv.executeSql(stmt))
>  File 
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 154, in deco
>raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
>bytes is less than the least required Python worker Memory 79 mb. The Task 
>Off-Heap Memory can be configured using the configuration key 
>'taskmanager.memory.task.off-heap.size'."
>
>
>【代码如下】:
>s_env = StreamExecutionEnvironment.get_execution_environment()
>s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
># s_env.set_parallelism(8)
>env = StreamTableEnvironment.create(s_env,
>
> environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
>env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '0m')
># 注册源表
>env.execute_sql(get_table_ddl('TP_GL_DAY'))
>env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))
>
>
># 注册输出表
>out_ddl = '''
>CREATE TABLE print_result (
> yldrate1 DOUBLE
>) WITH (
> 'connector' = 'print'
>)
>'''
>env.execute_sql(out_ddl)
># 定义及执行SQL
>log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
>TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
>view_table = env.sql_query(log_query)
>env.register_table('query_result', view_table)
>
>
># 定义计算逻辑函数
>@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
>udf_type="pandas")
>def multi_production(yldrate):
>yldrate_1 = yldrate + 1
>return np.prod(yldrate_1) - 1
>
>
># 注册函数
>env.register_function('multi_production', multi_production)
>env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
>FROM query_result')
>query_result.print_schema()
>env.execute('my_udf_job')
>


Re:pyflink1.11 udf计算结果打印问题 The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread
抱歉,报错信息理解错误,问题已经解决,感谢大佬。







在 2021-02-03 10:16:38,"肖越" <18242988...@163.com> 写道:

pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*/udtf_test.py", line 42, in 
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
FROM query_result')
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 543, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,

environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
CREATE TABLE print_result (
 yldrate1 DOUBLE
) WITH (
 'connector' = 'print'
)
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM 
query_result')
query_result.print_schema()
env.execute('my_udf_job')





 

pyflink1.11 udf计算结果打印问题 Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb

2021-02-02 Thread
pyflink1.11 测试udf函数,将表格定义的一列Double类型输入,计算这一列数值的累乘积,
结果返回一个Double类型数值,已经配置了taskmanager.memory.task.off-heap.size,但没什么用。
结果print报错:
Traceback (most recent call last):
  File "C:*/udtf_test.py", line 42, in 
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) 
FROM query_result')
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 543, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."


【代码如下】:
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# s_env.set_parallelism(8)
env = StreamTableEnvironment.create(s_env,

environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
 '0m')
# 注册源表
env.execute_sql(get_table_ddl('TP_GL_DAY'))
env.execute_sql(get_table_ddl('TS_PF_SEC_YLDRATE'))


# 注册输出表
out_ddl = '''
CREATE TABLE print_result (
 yldrate1 DOUBLE
) WITH (
 'connector' = 'print'
)
'''
env.execute_sql(out_ddl)
# 定义及执行SQL
log_query = "SELECT BIZ_DATE, YLDRATE, PF_ID, SYMBOL_ID FROM TP_GL_DAY JOIN 
TS_PF_SEC_YLDRATE ON DAY_ID = BIZ_DATE WHERE PF_ID = '1234' "
view_table = env.sql_query(log_query)
env.register_table('query_result', view_table)


# 定义计算逻辑函数
@udf(input_types=DataTypes.DOUBLE(), result_type=DataTypes.DOUBLE(), 
udf_type="pandas")
def multi_production(yldrate):
yldrate_1 = yldrate + 1
return np.prod(yldrate_1) - 1


# 注册函数
env.register_function('multi_production', multi_production)
env.execute_sql('INSERT INTO print_result SELECT multi_production(YLDRATE) FROM 
query_result')
query_result.print_schema()
env.execute('my_udf_job')



pyflink1.11 table.to_pandas() 报错 'Sort on a non-time-attribute field is not supported.'

2021-01-27 Thread
通过sql_query执行返回table对象,执行table.to_pandas()报错:
Traceback (most recent call last):
  File 
"C:/projects/dataService-pyflink_explore/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 161, in 
print(table.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 723, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'Sort on a non-time-attribute field is 
not supported.'
请教各位大佬,为什么会转换失败?但表格本身print_schema是没问题的。

pyflink 1.11 connector数据读取问题

2021-01-26 Thread
目前通过 connector 定义ddl的方式,通过数据库读取数据,方式如下:
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
每次表格获取及定义都需要定义数据类型(如例子中: BIGINT,STRING,INT,BOOLEN),
请问大佬是否有其他从数据库读取方式,可以不用定义数据类型??

Re:pyflink1.11 从Mysql读取Decimal类型数据,数据精度损失问题

2020-12-29 Thread
问题已解决 ~ 数据定义的问题
















在 2020-12-30 13:41:16,"肖越" <18242988...@163.com> 写道:

通过connector的方式定义数据:
数据类型定义为:yldrate DECIMAL, pf__id VARCHAR(10), symbol_id VARCHAR(30)
数据库中yldrate数据前几条是这样的:
0.72101337
0.
0.
0.000212493881
0.78719845
0.73023505
0.70173309
0.70168385
但是,pyflink读取出来转为pandas后都是:
 yldrate  
0   0  
1   0  
2   0  
3   0  
4   0  
5   0  
不知道是什么原因,导致精度损失,我要如何设置,使数据能够无损失读取?小白在线蹲大佬回复~拜托啦!







 

pyflink1.11 从Mysql读取Decimal类型数据,数据精度损失问题

2020-12-29 Thread
通过connector的方式定义数据:
数据类型定义为:yldrate DECIMAL, pf__id VARCHAR(10), symbol_id VARCHAR(30)
数据库中yldrate数据前几条是这样的:
0.72101337
0.
0.
0.000212493881
0.78719845
0.73023505
0.70173309
0.70168385
但是,pyflink读取出来转为pandas后都是:
 yldrate  
0   0  
1   0  
2   0  
3   0  
4   0  
5   0  
不知道是什么原因,导致精度损失,我要如何设置,使数据能够无损失读取?小白在线蹲大佬回复~拜托啦!



Re:Re: pyflink1.12 使用connector read.query参数报错

2020-12-24 Thread
谢谢,老师们的指导,根据嘉伟的建议,发现pyflink1.12确实并不支持这个参数~ 

还是希望官方能够开放这个参数,就目前的工作情景来说,取数据就需要定义整张表,如果数据库更改,代码这边很不便于维护;
从本机的实验结果上看,pyflink内部进行query的效率并不高,正准备放到集群上试试~







在 2020-12-25 09:45:28,"Leonard Xu"  写道:
>Hi, 嘉伟  
>
>1.12 应该不支持 read.query, 社区还在讨论是否要开放这个,有些concern, 简单的讲,就如你这个query写的,创建的这张JDBC 
>表应该是一个 View 而不是对应一张JDBC 表,同时这个表只能用来作为source,不能用来作为sink。
>
>祝好,
>Leonard
>
>> 在 2020年12月24日,19:16,冯嘉伟 <1425385...@qq.com> 写道:
>> 
>> hi! 试试这个
>> 
>> CREATE TABLE source_table(
>>yldrate DECIMAL,
>>pf_id VARCHAR,
>>symbol_id VARCHAR) WITH(
>>'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://ip/db',
>>'driver' = 'com.mysql.cj.jdbc.Driver',
>>'username' = 'xxx',
>>'password' = 'xxx',
>>'table-name' = 'TS_PF_SEC_YLDRATE',
>>'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
>> TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
>> "AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
>> between "20160701" AND "20170307"'
>>)
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>


pyflink1.12 使用connector read.query参数报错

2020-12-23 Thread
使用DDL 定义connector连接Mysql数据库,想通过发送sql的方式直接获取数据:
source_ddl = """
CREATE TABLE source_table(
yldrate DECIMAL,
pf_id VARCHAR,
symbol_id VARCHAR) WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://ip/db',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = 'xxx',
'table-name' = 'TS_PF_SEC_YLDRATE'
'read.query' = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM 
TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE = 
'AC' AND PF_ID = '1030100122' AND SYMBOL_ID = '2030004042' AND BIZ_DATE between 
'20160701' AND '20170307'"
)
"""
报错信息:
File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 766, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o6.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered 
"=" at line 12, column 30.
Was expecting one of:
"UESCAPE" ...
 ...
")" ...
"," ...

提示期待的语法信息,没有看懂,为什么不能出现“=” ?希望路过的大佬,能够指导一下~~谢谢!

pyflink query 语句执行获取数据速度很慢,where子句不过滤数据么?

2020-12-23 Thread
connector 从数据库读取整张表格,执行:
env.sql_query("select a , b, c from table1 left join table2 on a = d where b = 
'103' and c = '203' and e = 'AC' and a between 20160701 and 20170307 order a")
其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟~
了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?

pyflink query 语句执行获取数据速度很慢,where子句不过滤数据么?

2020-12-23 Thread
connector 从数据库读取整张表格,执行:
env.sql_query("select a , b, c from table1 left join table2 on a = d where b = 
'103' and c = '203' and e = 'AC' and a between 20160701 and 20170307 order by 
biz_date")
其中表 a 的数据量很大,能有1千万条,但匹配出来的数据只有250条,本机执行要10分钟!
了解到 flink 1.11存在where子句不会先过滤数据,请问flink1.12 仍存在这个问题么?怎么优化呢?

Re:Re: pyflink 1.12 是不支持 通过sql 直接向数据库获取数据的操作么? 没看到相关接口

2020-12-22 Thread
您好,感谢您的回复。
是的,目前是通过connector定义了整张表的字段来获取的数据,
connector中是否有字段支持,设置sql语句直接获取数据库检索后返回的数据呢?
现在这种方式,如若数据库表字段变更,以后很难维护啊~







在 2020-12-23 14:36:20,"Wei Zhong"  写道:
>你好,
>
>pyflink需要通过声明jdbc connector的方式来从数据库中获取数据。
>
>> 在 2020年12月22日,17:40,肖越 <18242988...@163.com> 写道:
>> 
>> 例如:pandas.read_sql()的用法,直接返回源数据,pyflink小白,蹲大佬的答复。
>


pyflink 1.12 是不支持 通过sql 直接向数据库获取数据的操作么? 没看到相关接口

2020-12-22 Thread
例如:pandas.read_sql()的用法,直接返回源数据,pyflink小白,蹲大佬的答复。


pyflink1.12 进行多表关联后的结果类型是TableResult,如何转为Table类型

2020-12-21 Thread
通过sql进行左连接查询,sql语句为:
sql = ''' Insert into print_sink select a.id, a.pf_id, b.symbol_id from  a \
 left join b on b.day_id = a.biz_date where a.ccy_type = 'AC' and \
 a.pf_id = '1030100122' and b.symbol_id = '2030004042' and a.biz_date 
between '20160701' and '20170307' '''


table_result = env.execute_sql(sql)
通过env.execute_sql()执行后的结果是 TableResult , 如何转成Table类型?
或者有哪些其他的方式,可以直接执行表的连接操作,返回结果是Table类型?



pyflink1.12 连接Mysql报错 : Missing required options

2020-12-20 Thread
在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
#DDL定义
source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\

symbol_id VARCHAR,biz_date VARCHAR,\

ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\

is_valid DECIMAL,time_mark TIMESTAMP) WITH (

'connector' = 'jdbc',

'connector.url' = 'jdbc:mysql://ip:port/db_base',

'connector.table' = 'ts_pf_sec_yldrate',

'table-name' = 'ts_pf_sec_yldrate',

'connector.driver' = 'com.mysql.jdbc.Driver',

'connector.username' = 'xxx',

'connector.password' = 'xxx')

"""
错误信息:
Traceback (most recent call last):
  File 
"C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
 line 67, in 
print(join.to_pandas().head(6))
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
 line 807, in to_pandas
.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
: org.apache.flink.table.api.ValidationException: Unable to create a source for 
reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.


Table options are:


'connector'='jdbc'
'connector.driver'='com.mysql.jdbc.Driver'
'connector.password'='xxx'
'connector.table'='ts_pf_sec_yldrate'
'connector.url'='jdbc:mysql://ip:port/db_base'
'connector.username'='xxx'
'table-name'='ts_pf_sec_yldrate'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at 
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
at 
org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
at 

取消订阅中文资讯邮件列表失败

2020-12-17 Thread
由失误操作使用了企业邮箱订阅,目前无法取消订阅,向user-zh-unsubscribe发送多封邮件也无效?请问官方有解决办法么?

Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 Thread
您好,这是完整的报错信息:
Traceback (most recent call last):

  File 
"C:\projects\dataService-calculate-code-python\src\test\test_oracle_connector.py",
 line 24, in 


"C:\projects\dataService-calculate-code-python\src\\test\\flink_connector-jdbc\\flink-connector-jdbc_2.11-1.12.0.jar")

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\common\configuration.py",
 line 72, in set_string

add_jars_to_context_class_loader(value.split(";"))

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py",
 line 114, in add_jars_to_context_class_loader

jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py",
 line 114, in 

jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1554, in __call__

answer, self._gateway_client, None, self._fqn)

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco

return f(*a, **kw)

  File 
"C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value

format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.

: java.net.MalformedURLException: unknown protocol: c

at java.net.URL.(URL.java:617)

at java.net.URL.(URL.java:507)

at java.net.URL.(URL.java:456)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)

at 
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at 
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

















在 2020-12-17 14:57:36,"Dian Fu"  写道:
>发一下完整的异常信息?
>
>> 在 2020年12月17日,上午11:53,肖越 <18242988...@163.com> 写道:
>> 
>> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error 
>> occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-12-17 10:44:56,"Dian Fu"  写道:
>>> 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>>> 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>>>a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
>>> 'jdbc',这个是老的使用方式
>>>b. JDBC 
>>> connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>>> [2] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>>> 
>>>> 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
>>>> 
>>>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>>>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>>>> 
>>>>> 祝好,
>>>>> Leonard
>>>>> 
>>>>>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>>>>>> 
>>>>>> pyflink小白,

pyflink 有没有方便的print方式?例如java api中的 .print() ?

2020-12-16 Thread
最近在尝试 pyflink 功能,只查到了定义connector 的输出方法,例如:
sink_ddl = '''
CREATE TABLE print_sink (
ID DOUBLE,
NAME STRING
) WITH (
  'connector' = 'print'
)
'''
每次都要事先定义好要输出的表格格式,是否有更加方便的输出方法?

Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 Thread
好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error 
occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?

















在 2020-12-17 10:44:56,"Dian Fu"  写道:
>1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
> a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 
> 'jdbc',这个是老的使用方式
> b. JDBC 
> connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
> 
><https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>
>> 在 2020年12月17日,上午10:06,肖越 <18242988...@163.com> 写道:
>> 
>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>> 
>>> 祝好,
>>> Leonard
>>> 
>>>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>>>> 
>>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>>> 通过如下方式定义:
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> env.set_parallelism(1)
>>>> env = StreamTableEnvironment \
>>>>   .create(env, environment_settings=EnvironmentSettings
>>>>   .new_instance()
>>>>   .use_blink_planner().build())
>>>> source_ddl1 = """
>>>>   CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>>   tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>>   ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>>   is_valid INT,time_mark TIMESTAMP) WITH (
>>>>   'connector.type' = 'jdbc',
>>>>   'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>>   'connector.table' = 'ts_pf_ac_yldrate',
>>>>   'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>>   'connector.username' = 'xxx',
>>>>   'connector.password' = 'xxx')
>>>>   """
>>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>>> env.sql_update(source_ddl1)
>>>> table = env.sql_query(sql)
>>>> env.execute("flink_test")
>>>> 报错信息:
>>>>   raise java_exception
>>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>>> at 
>>>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>>> at 
>>>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>>> at 
>>>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>>> at 
>>>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>>> at 
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>>> at 
>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>

Re:Re: 求教:pyflink的sink是否支持redis connector?

2020-12-16 Thread
请问 可以连接oracle数据库么?自己尝试了定义connector,但报错findAndCreateTableSource failed 
,并没有找到pyflink关于oracle connector的定义示例代码

















在 2020-12-17 10:16:13,"Dian Fu"  写道:
>感谢Xingbo的回复,稍微补充一点:所有Table & SQL支持的connector都可以用在PyFlink中。
>
>redis的connector没有直接在Flink的代码库里提供,这里有一个,应该也可以用:https://github.com/apache/bahir-flink/tree/master/flink-connector-redis
> 
>
>关于如何在PyFlink中使用connector,可以参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html
>
>> 在 2020年12月17日,上午9:52,Xingbo Huang  写道:
>> 
>> Hi,
>> 
>> 据我所知,flink没有提供对redis connector的官方支持[1],你需要根据官方提供的接口来自定义你的redis
>> connector,关于如何自定义connector,你可以参考文档[2]
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sourceSinks.html
>> 
>> Best,
>> Xingbo
>> 
>> 
>> 消息室  于2020年12月17日周四 上午9:33写道:
>> 
>>> 您好:
>>> 
>>>   
>>> 我们项目组计划使用pyflink,有幸拜读了您的博客,我想请教一下当前1.12.0版本的pyflink的sink是否支持redis
>>> connector?感谢!
>>>如不支持,有何建议方式?
>


Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 Thread
谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?

















在 2020-12-17 09:55:08,"Leonard Xu"  写道:
>目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>
>祝好,
>Leonard
>
>> 在 2020年12月17日,09:47,肖越 <18242988...@163.com> 写道:
>> 
>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>> 通过如下方式定义:
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env = StreamTableEnvironment \
>>.create(env, environment_settings=EnvironmentSettings
>>.new_instance()
>>.use_blink_planner().build())
>> source_ddl1 = """
>>CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>is_valid INT,time_mark TIMESTAMP) WITH (
>>'connector.type' = 'jdbc',
>>'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>'connector.table' = 'ts_pf_ac_yldrate',
>>'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>'connector.username' = 'xxx',
>>'connector.password' = 'xxx')
>>"""
>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>> env.sql_update(source_ddl1)
>> table = env.sql_query(sql)
>> env.execute("flink_test")
>> 报错信息:
>>raise java_exception
>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>> at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>> at 
>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>> at 
>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>> at 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>> at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>> at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>> at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>> at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>> at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>> at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)


Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

2020-12-16 Thread
pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
通过如下方式定义:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env = StreamTableEnvironment \
.create(env, environment_settings=EnvironmentSettings
.new_instance()
.use_blink_planner().build())
source_ddl1 = """
CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
is_valid INT,time_mark TIMESTAMP) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
'connector.table' = 'ts_pf_ac_yldrate',
'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
'connector.username' = 'xxx',
'connector.password' = 'xxx')
"""
sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
env.sql_update(source_ddl1)
table = env.sql_query(sql)
env.execute("flink_test")
报错信息:
raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)