小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。

赵一旦 <[email protected]> 于2020年8月17日周一 下午5:00写道:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li <[email protected]> 于2020年8月17日周一 下午3:46写道:
>
>> 可能是打fat
>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>
>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦 <[email protected]> wrote:
>>
>> > 代码如下:
>> > // tEnv;
>> > tEnv.sqlUpdate("create table dr1(  " +
>> >         "  cid STRING,  " +
>> >         "  server_time BIGINT,  " +
>> >         "  d MAP<STRING, STRING>,  " +
>> >         "  process_time AS PROCTIME(),  " +
>> >         "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>> 1000)),
>> > " +
>> >         "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
>> > " +
>> >         ") WITH (  " +
>> >         "  'update-mode' = 'append',  " +
>> >         "  'connector.type' = 'kafka',  " +
>> >         "  'connector.version' = 'universal',  " +
>> >         "  'connector.topic' = 'antibot_dr1',  " +
>> >         "  'connector.startup-mode' = 'latest-offset',  " +
>> >         "  'connector.properties.zookeeper.connect' =
>> > 'yq01-sw-xxx03.yq01:8681',  " +
>> >         "  'connector.properties.bootstrap.servers' =
>> > 'yq01-sw-xxx03.yq01:8192',  " +
>> >         "  'format.type' = 'json'  " +
>> >         ")");
>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>> >
>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>> >
>> > test.jar是个fat jar,相关依赖都有了。
>> >
>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>> >
>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> > Could not find a suitable table factory for
>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>> > the classpath.
>> >
>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>> >
>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>> >
>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>> >
>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>> >
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>

回复