关于使用RocksDBStateBackend TTL 配置的问题

2020-04-02 文章 yanggang_it_job
Hi:
   
我们现在启用state.backend.rocksdb.ttl.compaction.filter.enabled进行rocksdb的有效期设置,但效果并不是那么理想。
   同时我也有以下问题想不明白:
   1、如果rocksdb在compact的时候有些state并没有被compact到,是否就意味着就算这些state已经过期也不会被删除?
   2、目前flink的ttl策略只有OnCreateAndWrite和OnReadAndWrite两种策略,是否有那种不需要刷新,到了TTL时间就自动清除。
 否则就会出现state一直在刷新导致永远无法删除,最终导致磁盘打满


   目前我能想到的方案是,另外写一个定时任务根据配置去清除过期state。
   请问大家还有其他更好的方案吗?

Re: Flink 读取 Kafka 多个 Partition 问题,

2020-04-02 文章 LakeShen
Hi Qijun,

看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:158567040L,后面是不是只有分区3写入数据,个人的想法。

Best,
LakeShen

Qijun Feng  于2020年4月2日周四 下午5:44写道:

> Dear All,
>
> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
>  现在改成了所有地址,也换了 group.id
>
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> 10.216.77.170:9092,10.216.77.188:9092");
> properties.setProperty("group.id", "behavior-logs-aggregator");
>
> FlinkKafkaConsumer010 kafkaConsumer010 =
>new FlinkKafkaConsumer010("behavior-logs_dev", new
> BehaviorLogDeserializationSchema(), properties);
> kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01
>
> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2
> 的,
>
> 2020-04-02 14:54:58,532 INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Consumer subtask 0 creating fetcher with offsets
> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
>
>
> 是哪里有问题吗?
>
>


Re: 从savepoint不能恢复问题

2020-04-02 文章 LakeShen
Hi ,

这种情况可能是你改变的 Flink SQL 的拓扑结构,导致部分算子的 uid 发生变化,然后在从状态恢复的时候,没有找到算子的状态。
所以在开发 SQL 任务的时候,一般更改 SQL 代码时,不要改变其拓扑结构,SQL 任务上线后,就不要在轻意改了。

Best,
LakeShen

酷酷的浑蛋  于2020年4月2日周四 下午6:31写道:

> 关键我的程序是flink-sql,其它的算子基本都设置过uid了,flink-sql可以设置uid吗,或者说sql中的自动分配的uid怎么查找呢
>
>
> | |
> apache22
> |
> |
> apach...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年4月2日 18:22,Yangze Guo 写道:
> 如果没有显示指定的话,operator id将是一个随机生成的值[1].
> 当从savepoint恢复时,将依据这些id来匹配,如果发生了变化,可能是你修改了你的jobGraph。Flink推荐显示的指定operator
> id字面量。[2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq
>
> Best,
> Yangze Guo
>
> On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋  wrote:
>
>
>
> Failed to rollback to checkpoint/savepoint
> hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint
> state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program,
> because the operator is not available in the new program. If you want to
> allow to skip this, you can set the --allowNonRestoredState option on the
> CLI.
>
>
> 像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2 去找是我程序中的哪个算子不能恢复呢?
> | |
> apache22
> |
> |
> apach...@163.com
> |
> 签名由网易邮箱大师定制
>


Re: Flink SQL1.10使用Mysql作为维表关联,数据量太大导致task manager time out

2020-04-02 文章 godfrey he
Hi Xinghalo,

欢迎向 sql gateway 贡献~

Best,
Godfrey

111  于2020年4月2日周四 上午11:10写道:

> Hi,
> 了解了,那我知道怎么解决了。我这边使用的是sql-gateway,看样子得在sql-gateway里面加一种表定义的语法了。
> 多谢多谢
>
>
> Best,
> Xinghalo
> 在2020年04月2日 10:52,Benchao Li 写道:
> 你写的不是维表join的语法,维表join现在用的是temporal table[1] 的方式来实现的,需要特殊的join语法:
>
> SELECT
> o.amout, o.currency, r.rate, o.amount * r.rateFROM
> Orders AS o
> JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
> ON r.currency = o.currency
>
> 此外,你看的JDBCTableSource是一个普通的bounded source,或者是batch
> source。真正的维表的代码是在JDBCLookupFunction里面的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> 111  于2020年4月2日周四 上午10:33写道:
>
> Hi,
> 试验了下貌似不行,我的sql:
>
>
> select s.*, item.product_name
> from  ( select member_id, uuid, page, cast(SPLIT_INDEX(page,
> '10.pd.item-', 1) as string) as item_id,  `time` from tgs_topic_t1  where
> page like '10.pd.item-%’ ) s
> inner join PROD_ITEM_MALL_ACTIVITY_PRODUCT item on cast(item.id as
> string) = s.item_id
> where s.item_id is not null
>
>
> 看了下代码JDBCTableSource中的实现
> String query = dialect.getSelectFromStatement( options.getTableName(),
> returnType.getFieldNames(), new String[0]);
> 构建sql的时候  conditionFields 是写死的空数组,因此肯定不会有查询条件参与进来。
> 后面真正open()连接查询的时候,也只处理了分区字段,并没有查询条件字段。
> 在2020年04月2日 10:11,Benchao Li 写道:
> Hi,
>
> 能否把你的SQL也发出来呢?
>
>
> 正常来讲,维表关联用的是join的等值条件作为关联的条件去mysql查询,然后如果还有其他的filter,会在关联之后的结果之上在做filter。如果你发现每次都是扫描全表,很有可能是你的维表join的条件写的有问题导致的。
>
> 111  于2020年4月2日周四 上午9:55写道:
>
> Hi,
> 想确认下MySQL JDBC Connector的实现细节,目前了解到的实现是:
> 1 获取查询sql中的字段和表名,拼接成select a, b, c from t
> 2 创建并行输入,如果制定了分区字段,会按照分区字段和并行度切分生成where条件,如where id between xxx and xxx
> 3 执行sql,加载到内存(不确定后面缓存的实现细节)
>
>
>
>
>
> 目前我们遇到的问题是,我们把mysql作为维表(表的量级在一千500万左右,并行度为10),没有指定分区条件(只有一个slot执行查询,其他的都没有查询任务)。
> 结果导致只有一个分区查询数据,查询的sql为select xxx from t(全表查询)。
> 可是现在并行度由于集群的限制,不能调整的过大,如何实现按需查询呢?
>
>
> 之前我们在datastream api中,是使用异步API,根据某个条件单独查询某条数据,所以速度很快;现在需要全部加载过来,资源消耗太大了。
> 不知道有没有什么优雅的解决方案?
>
>
> Best,
> Xinghalo
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: 动态处理字段动态sink

2020-04-02 文章 Lee Djeng
CEP干这活不合适嘛

出发 <573693...@qq.com> 于2020年4月2日周四 下午7:54写道:

> 感谢大佬们的流转sql,我突然想到,不能去规则引擎考虑问题,用sql不就好了
> --原始邮件--
> 发件人:"出发  "<573693...@qq.com;
> 发送时间:2020年4月2日(星期四) 下午4:57
> 收件人:"user-zh"
> 主题:回复: 动态处理字段动态sink
>
>
>
> 动态广播是获得了我配置的规则,就是这个规则如何解析实现,没有太好思路,目前只有查到一个drools。
>
>
>
>
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Jun Zhang" 发送时间:nbsp;2020年4月2日(星期四) 下午4:41
> 收件人:nbsp;"user-zh"
> 主题:nbsp;Re: 动态处理字段动态sink
>
>
>
> hi:
> 不知道广播流能否满足你的需求
>
> 出发 <573693...@qq.comgt; 于2020年4月2日周四 下午4:25写道:
>
> gt;
> gt;
> 老铁们,消费kafka一些数据时候,根据规则,将满足条件的一些字段生成新字段,插入到es或者pg等里面,其中映射字段是动态的,插入字段也是动态的,结合drools来做比较好吗。


?????? ????????????????sink

2020-04-02 文章 ????
sqlsql
----
??:"  "<573693...@qq.com;
:2020??4??2??(??) 4:57
??:"user-zh"

[flink-1.10]有关使用cep功能时times的问题以及pyflink table api的聚合计算操作问题

2020-04-02 文章 Zhefu PENG
Hi all,

最近在做flink相关的使用和开发,现在遇到了两个问题:

1.当使用cep功能时, pattern本身的积累功能有一个Pattern.times()的接口,
当我在这个times中输入一个较大的数字(比如超过100,000及以上的数字)的时候,
启动相关的任务会非常耗时甚至耗时过久。比如我用100,000时, 启动时间一共花了15+min.

启动方式为:flink run thenameofjar.jar
不知道这是使用方式不对还是什么其他问题?

2. 当我们用pyflink的table api时, 会出现聚合计算的错误,代码和错误日志如下:

# 环境声明
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

schema声明
return Schema() \
.field('source', DataTypes.STRING()) \
.field('proctime',DataTypes.TIMESTAMP()).proctime()


# 窗口group计算
table3 = table1.window(Tumble.over("1.minutes").on("proctime").alias('w')) \
.group_by('w, source') \
.select("source")

error日志:

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/runpy.py", line 193, in
_run_module_as_main
"__main__", mod_spec)
  File "/usr/local/python3/lib/python3.6/runpy.py", line 85, in _run_code
exec(code, run_globals)
  File "/flink_test/flink_udf_kafka.py", line 65, in 
.select("source")
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594pyflink.zip/pyflink/table/table.py",
line 784, in select
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o196.select.
: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 

回复: 从savepoint不能恢复问题

2020-04-02 文章 酷酷的浑蛋
关键我的程序是flink-sql,其它的算子基本都设置过uid了,flink-sql可以设置uid吗,或者说sql中的自动分配的uid怎么查找呢


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
在2020年4月2日 18:22,Yangze Guo 写道:
如果没有显示指定的话,operator id将是一个随机生成的值[1].
当从savepoint恢复时,将依据这些id来匹配,如果发生了变化,可能是你修改了你的jobGraph。Flink推荐显示的指定operator
id字面量。[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq

Best,
Yangze Guo

On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋  wrote:



Failed to rollback to checkpoint/savepoint 
hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint state 
for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the 
operator is not available in the new program. If you want to allow to skip 
this, you can set the --allowNonRestoredState option on the CLI.


像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2 去找是我程序中的哪个算子不能恢复呢?
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


Re: 从savepoint不能恢复问题

2020-04-02 文章 Yangze Guo
如果没有显示指定的话,operator id将是一个随机生成的值[1].
当从savepoint恢复时,将依据这些id来匹配,如果发生了变化,可能是你修改了你的jobGraph。Flink推荐显示的指定operator
id字面量。[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq

Best,
Yangze Guo

On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋  wrote:
>
>
>
> Failed to rollback to checkpoint/savepoint 
> hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint 
> state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, 
> because the operator is not available in the new program. If you want to 
> allow to skip this, you can set the --allowNonRestoredState option on the CLI.
>
>
> 像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2 去找是我程序中的哪个算子不能恢复呢?
> | |
> apache22
> |
> |
> apach...@163.com
> |
> 签名由网易邮箱大师定制


从savepoint不能恢复问题

2020-04-02 文章 酷酷的浑蛋


Failed to rollback to checkpoint/savepoint 
hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint state 
for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the 
operator is not available in the new program. If you want to allow to skip 
this, you can set the --allowNonRestoredState option on the CLI.


像上面这个错误,我怎么根据 operator cbc357ccb763df2852fee8c4fc7d55f2 去找是我程序中的哪个算子不能恢复呢?
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制

Flink 读取 Kafka 多个 Partition 问题,

2020-04-02 文章 Qijun Feng
Dear All,

我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka 消息的时候,感觉只从一个 
partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,  现在改成了所有地址,也换了 group.id


Properties properties = new Properties();
properties.setProperty("bootstrap.servers", 
"10.216.85.201:9092,10.216.77.170:9092,10.216.77.188:9092");
properties.setProperty("group.id", "behavior-logs-aggregator");

FlinkKafkaConsumer010 kafkaConsumer010 =
   new FlinkKafkaConsumer010("behavior-logs_dev", new 
BehaviorLogDeserializationSchema(), properties);
kafkaConsumer010.setStartFromTimestamp(158567040L); //2020/04/01

处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者 2 的,

2020-04-02 14:54:58,532 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
subtask 0 creating fetcher with offsets 
{KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.


是哪里有问题吗?



Re:回复: Flink实时写入hive异常

2020-04-02 文章 sunfulin






hi,
请教下,现有功能里,可以将hive表作为维表join么?作为temporal table。
如果可以的话,hive分区表如何join呢?一般维表join是要join最新分区的全量数据。











在 2020-04-02 17:30:39,"111"  写道:
>Hi,
>只要能解决upsert问题带来的存储碎片、读写冲突、版本回溯,实时写入Hive也是可以的,目前spark delta lake就已经做到了。
>前面jingsong也提到过,会去解决文件存储、合并等问题,那到时候flink实时写入hive就没问题了。
>Best,
>Xinghalo


回复: Flink实时写入hive异常

2020-04-02 文章 111
Hi,
只要能解决upsert问题带来的存储碎片、读写冲突、版本回溯,实时写入Hive也是可以的,目前spark delta lake就已经做到了。
前面jingsong也提到过,会去解决文件存储、合并等问题,那到时候flink实时写入hive就没问题了。
Best,
Xinghalo

Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-02 文章 deadwind4
真是谢谢你。翻了下源码发现不太对劲,下次我先搜搜Issue再提问。感谢。


 原始邮件 
发件人: Kurt Young
收件人: user-zh
发送时间: 2020年4月2日(周四) 17:18
主题: Re: flink 1.10 createTemporaryTable丢失proctime问题


看起来你是踩到了这个bug:https://issues.apache.org/jira/browse/FLINK-16160 
在这个bug修复前,先继续用老的API吧 Best, Kurt On Thu, Apr 2, 2020 at 10:34 AM deadwind4 
 wrote: > registerTableSource 被标记了@Deprecated 在flink > 
1.10,我这种情况是继续沿用过期的API(registerTableSource)吗? > > > 原始邮件 > 发件人: 
deadwind4 > 收件人: user-zh > 
发送时间: 2020年4月2日(周四) 10:30 > 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题 
> > > 修改前 > tEnv.connect().withFormat().withSchema( > xxx.proctime() > 
).registerTableSource(“foo”); > > > 修改后 > 
tEnv.connect().withFormat().withSchema( > xxx.proctime() > 
).createTemporaryTable(“foo”); > > > 修改后.proctime()就失效了,所以我proctime 
window也用不了了。 > > > 原始邮件 > 发件人: deadwind4 > 收件人: 
user-zh > 发送时间: 2020年4月2日(周四) 10:22 > 主题: Re: flink 
1.10 createTemporaryTable丢失proctime问题 > > > 
tEnv.connect().withFormat().withSchema().registerTableSource(“foo”); > 
tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”); > > > 
原始邮件 > 发件人: Jark Wu > 收件人: user-zh 
> 发送时间: 2020年4月2日(周四) 10:18 > 主题: Re: flink 1.10 
createTemporaryTable丢失proctime问题 > > > Hi, 你能描述下你的改动前后的代码吗?据我所知 
TableEnvironment 上没有 createTemporaryTable > 方法,只有createTemporaryView方法,而且 
registerTableSource 和 createTemporaryView > 的参数是不一样的。 Best, Jark > 2020年4月1日 
23:13,deadwind4  > 写道: > > 我其实是想用processing time window 
但是我把过期的API > 
registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。 
> > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh< > 
user-zh@flink.apache.org> > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink > 1.10 
createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于 > now()或者 
current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取 > System.currentTimeMillis)。 
能请描述下你想用 createTemporaryTable > 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 
2020 at 18:56, deadwind4 < > deadwi...@outlook.com> wrote: > > > 
我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。 
> > 如果我想使用createTemporaryTable该怎么办。 > > 
并且我debug了createTemporaryTable的源码没有发现对proctime的处理。

Re: flink sql 实现双流join 的滑动窗口

2020-04-02 文章 Benchao Li
hi kevin,

双流join有两种形式,一种是Time-windowed Join,一种是regular join,见[1]。

tumble window是一个单独的算子,是对一个输入流做窗口聚合。它本身不会跟join有什么关系。
所以你的SQL其实是先做了两个regular join,在join的结果上又做了一个window聚合。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins

Kevin  于2020年4月2日周四 下午4:45写道:

> 补一个报错信息:
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Rowtime attributes must not be
> in the input rows of a regular join. As a workaround you can cast the time
> attributes of input tables to TIMESTAMP before.
>
> --
> 发件人:Kevin 
> 发送时间:2020年4月2日(星期四) 16:37
> 收件人:flink-zh 
> 主 题:flink sql 实现双流join 的滑动窗口
>
> HI ALL:
> 打扰大家了。请教下如何实现双流join的滑动窗口。我的代码运行后就报错了。有经验的同学帮忙解答下。谢谢。
>
> 代码:
> SELECT
> TUMBLE_START(a.ts, INTERVAL '10' MINUTE),
> TUMBLE_END(a.ts, INTERVAL '10' MINUTE),
> sum( CASE WHEN f.business_id <> NULL THEN 1 ELSE 0 END ) AS
> company_pass_first_num,
> sum( CASE WHEN g.business_id <> NULL THEN 1 ELSE 0 END ) AS
> company_pass_num,
> COUNT(a.create_time)
> FROM t1_oa_loan_requests_detail_view a
>   LEFT JOIN t1_oa_flow_inst_task_view_segment_no_5 f ON a.id =
> f.business_id
>   LEFT JOIN t1_oa_flow_inst_task_view_segment_no_61 g ON a.id =
> g.business_id
> GROUP BY TUMBLE(a.ts, INTERVAL '10' MINUTE);
>
> 报错:
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-02 文章 Kurt Young
看起来你是踩到了这个bug:https://issues.apache.org/jira/browse/FLINK-16160
在这个bug修复前,先继续用老的API吧

Best,
Kurt


On Thu, Apr 2, 2020 at 10:34 AM deadwind4  wrote:

> registerTableSource 被标记了@Deprecated 在flink
> 1.10,我这种情况是继续沿用过期的API(registerTableSource)吗?
>
>
>  原始邮件
> 发件人: deadwind4
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:30
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> 修改前
> tEnv.connect().withFormat().withSchema(
> xxx.proctime()
> ).registerTableSource(“foo”);
>
>
> 修改后
> tEnv.connect().withFormat().withSchema(
> xxx.proctime()
> ).createTemporaryTable(“foo”);
>
>
> 修改后.proctime()就失效了,所以我proctime window也用不了了。
>
>
>  原始邮件
> 发件人: deadwind4
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:22
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> tEnv.connect().withFormat().withSchema().registerTableSource(“foo”);
> tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”);
>
>
>  原始邮件
> 发件人: Jark Wu
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:18
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable
> 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView
> 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 
> 写道: > > 我其实是想用processing time window 但是我把过期的API
> registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。
> > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink
> 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于
> now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取
> System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable
> 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 <
> deadwi...@outlook.com> wrote: > >
> 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。
> > 如果我想使用createTemporaryTable该怎么办。 >
> 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。


Re: Flink实时写入hive异常

2020-04-02 文章 Leonard Xu
看起来stream -> (Flink SQL) -> hive 这个场景大家都很关注,预计FLIP115完成后能解这个场景。

Best,
Leonard

> 在 2020年4月2日,17:10,sunfulin  写道:
> 
> 
> 
> 
> Hi,
> 这里就涉及到一个话题,应该怎么去实践实时和离线数仓的数据融合。我能理解这个技术上的不合理,但是通过Flink实现数仓ETL的功能,我相信很多使用Flink的会将之作为一个重要场景。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-04-01 16:05:54,"111"  写道:
>> 
>> 
>> Hi,
>> 流写入hive,其实是属于数据湖的概念范畴。
>> 因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
>> 详细的可以了解 Delta lake 或 hudi。
>> 
>> 
>> 在2020年04月1日 15:05,sunfulin 写道:
>> Hi,
>> 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
>> 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
>> OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-04-01 15:01:32,"Jingsong Li"  写道:
>> 
>> Hi,
>> 
>> 
>> Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>> 
>> 
>> 你可以描述下详细堆栈、应用场景、SQL吗?
>> 
>> 
>> Best,
>> Jingsong Lee
>> 
>> 
>> On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:
>> 
>> 
>> 
>> 
>> 
>> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>> 
>> 
>> 
>> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not 
>> enough rules to produce a node with desired properties
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>> Hi,
>> 
>> 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>> 
>> Best,
>> Jingsong Lee
>> 
>> On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>> 
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>> 
>> 
>> 
>> 
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> 
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> 
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> 
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> 
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> 
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> 
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> 
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> 
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> 
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> 
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> 
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> 
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> 
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> 
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> 
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> 
>> at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>> 
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>> 
>> 
>> 
>> --
>> Best, Jingsong Lee
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> 
>> Best, Jingsong Lee



Re:回复: Flink实时写入hive异常

2020-04-02 文章 sunfulin



Hi,
这里就涉及到一个话题,应该怎么去实践实时和离线数仓的数据融合。我能理解这个技术上的不合理,但是通过Flink实现数仓ETL的功能,我相信很多使用Flink的会将之作为一个重要场景。














在 2020-04-01 16:05:54,"111"  写道:
>
>
>Hi,
>流写入hive,其实是属于数据湖的概念范畴。
>因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
>详细的可以了解 Delta lake 或 hudi。
>
>
>在2020年04月1日 15:05,sunfulin 写道:
>Hi,
>场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
>我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
>OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-01 15:01:32,"Jingsong Li"  写道:
>
>Hi,
>
>
>Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
>
>你可以描述下详细堆栈、应用场景、SQL吗?
>
>
>Best,
>Jingsong Lee
>
>
>On Wed, Apr 1, 2020 at 2:56 PM sunfulin  wrote:
>
>
>
>
>
>我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
>
>
>org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not 
>enough rules to produce a node with desired properties
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-01 14:49:41,"Jingsong Li"  写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin  wrote:
>
>Hi,
>我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>cc  @Jingsong Li  @Jark Wu
>
>
>
>
>org.apache.flink.table.api.TableException: Stream Tables can only be
>emitted by AppendStreamTableSink, RetractStreamTableSink, or
>UpsertStreamTableSink.
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
>at
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
>at
>org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
>at
>scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>at
>org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
>at
>org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
>at
>com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
>at
>com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>--
>Best, Jingsong Lee
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best, Jingsong Lee


?????? ????????????????sink

2020-04-02 文章 ????
??drools??




----
??:"Jun Zhang"

回复:flink sql 实现双流join 的滑动窗口

2020-04-02 文章 Kevin
补一个报错信息:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.

--
发件人:Kevin 
发送时间:2020年4月2日(星期四) 16:37
收件人:flink-zh 
主 题:flink sql 实现双流join 的滑动窗口

HI ALL:
打扰大家了。请教下如何实现双流join的滑动窗口。我的代码运行后就报错了。有经验的同学帮忙解答下。谢谢。

代码:
SELECT
TUMBLE_START(a.ts, INTERVAL '10' MINUTE),
TUMBLE_END(a.ts, INTERVAL '10' MINUTE),
sum( CASE WHEN f.business_id <> NULL THEN 1 ELSE 0 END ) AS 
company_pass_first_num,
sum( CASE WHEN g.business_id <> NULL THEN 1 ELSE 0 END ) AS company_pass_num,
COUNT(a.create_time)
FROM t1_oa_loan_requests_detail_view a
  LEFT JOIN t1_oa_flow_inst_task_view_segment_no_5 f ON a.id = f.business_id
  LEFT JOIN t1_oa_flow_inst_task_view_segment_no_61 g ON a.id = g.business_id 
GROUP BY TUMBLE(a.ts, INTERVAL '10' MINUTE);

报错:


Re: 动态处理字段动态sink

2020-04-02 文章 Jun Zhang
hi:
不知道广播流能否满足你的需求

出发 <573693...@qq.com> 于2020年4月2日周四 下午4:25写道:

>
> 老铁们,消费kafka一些数据时候,根据规则,将满足条件的一些字段生成新字段,插入到es或者pg等里面,其中映射字段是动态的,插入字段也是动态的,结合drools来做比较好吗。


????????????????sink

2020-04-02 文章 ????
kafka??espg??drools??

Re: Flink 1.10.0 HiveModule 函数问题

2020-04-02 文章 Yaoting Gong
Hi, Jingsong Li

你好,非常感谢你的解答。问题已经得到处理。

类似写法之前有尝试过,但是没有注意到与hive的差异,包括 T() 这些,所以也是失败的。加上官方文档没有仔细理解。

再次感谢答疑

On Thu, Apr 2, 2020 at 11:27 AM Jingsong Li  wrote:

> Hi,
>
> GenericUDTFExplode是一个UDTF。
> Flink中使用UDTF的方式是标准SQL的方式:
> "select x from db1.nested, lateral table(explode(a)) as T(x)"
>
> 你试下。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#table-functions
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 2, 2020 at 11:22 AM Yaoting Gong 
> wrote:
>
> > 大家后,
> >
> >我们项目目前 集成了HiveModule,遇到一些问题。请教下大家。
> >
> > 在集成  Hive Module 之前,substr,split 都是无法使用的。集成后,验证都是可以的。
> > 比如:select split('1,2,2,4',',')
> >
> > 但是在使用 explode 函数,select explode(split('1,2,2,4',','));
> > 有如下错误:
> >
> >
> > The main method caused an error: SQL validation failed. From line 1,
> column
> > 8 to line 1, column 36: *No match found for function signature explode(*
> > *)  *
> >
> > *完整堆栈:*
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: SQL validation failed. From line 1, column 8 to line 1,
> > column 36: No match found for function signature explode()
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> > at
> >
> org.apache.flink.client.cli.CliFrontend$$Lambda$38/1205406622.call(Unknown
> > Source)
> > at
> >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$39/293907205.run(Unknown
> > Source)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> > at
> >
> >
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> > Caused by: org.apache.flink.table.api.ValidationException: SQL validation
> > failed. From line 1, column 8 to line 1, column 36: No match found for
> > function signature explode()
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> > at
> >
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> > at
> >
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> > at
> >
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
> > at
> com.sui.bigdata.PlatformEngine$.$anonfun$main$4(PlatformEngine.scala:88)
> > at
> >
> >
> com.sui.bigdata.PlatformEngine$.$anonfun$main$4$adapted(PlatformEngine.scala:87)
> > at com.sui.bigdata.PlatformEngine$$$Lambda$765/623545006.apply(Unknown
> > Source)
> > at scala.collection.immutable.List.foreach(List.scala:388)
> > at com.sui.bigdata.PlatformEngine$.main(PlatformEngine.scala:87)
> > at com.sui.bigdata.PlatformEngine.main(PlatformEngine.scala)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:497)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> > ... 13 more
> > Caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1,
> > column 8 to line 1, column 36: No match found for function signature
> > explode()
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> > at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> > at
> >
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
> > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
> > at