I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,
connector.type = 'kafka'
connector.version = 'universal'
connector.topic = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = '....'
I tried adding
key.fields = 'my_key_field'
as well as
key.format = 'avro'
key.fields = 'my_key_field'
but I get the exception
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No factory supports all properties.
The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
... 21 more
I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?
--
Thank You,
Aeden