谢谢,我理解了。
[email protected] Sender: Harold.Miao Send Time: 2020-07-16 19:33 Receiver: user-zh Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? 我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 private static <T extends TableFactory> T findSingleInternal( Class<T> factoryClass, Map<String, String> properties, Optional<ClassLoader> classLoader) { List<TableFactory> tableFactories = discoverFactories(classLoader); List<T> filtered = filter(tableFactories, factoryClass, properties); if (filtered.size() > 1) { throw new AmbiguousTableFactoryException( filtered, factoryClass, tableFactories, properties); } else { return filtered.get(0); } } private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) { try { List<TableFactory> result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); } } [email protected] <[email protected]> 于2020年7月16日周四 下午7:04写道: > > 我在 > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > 找到了 SPI 的配置: > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > org.apache.flink.formats.json.JsonFormatFactory > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > 代码没找到类似的关系映射配置。 > > > 谢谢, > 王磊 > > > > [email protected] > > > Sender: godfrey he > Send Time: 2020-07-16 16:38 > Receiver: user-zh > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > Best, > Godfrey > > [email protected] <[email protected]> 于2020年7月16日周四 > 下午4:02写道: > > > 比如: > > > > CREATE TABLE my_table ( > > id BIGINT, > > first_name STRING, > > last_name STRING, > > email STRING > > ) WITH ( > > 'connector'='kafka', > > 'topic'='user_topic', > > 'properties.bootstrap.servers'='localhost:9092', > > 'scan.startup.mode'='earliest-offset', > > 'format'='debezium-json' > > ); > > > > 最终解析 debezium-json 应该是 > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > 下面的代码 > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > 谢谢, > > 王磊 > > > > > > [email protected] > > > > > -- Best Regards, Harold Miao
