Hi, abelm ~

Which version Flink did you use? We did some refactoring for the connector
options since Flink 1.11. The METADATA syntax is only supported since
version 1.12.

In 1.11, to ignore the parse errors, you need to use option
"json.ignore-parse-error" [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-ignore-parse-errors

abelm <abele.ma...@outlook.com> 于2020年12月10日周四 上午1:13写道:

> Hello! I have a Scala 2.12 project which registers some tables (that get
> their data from Kafka in JSON form) to the StreamTableEnvironment via the
> executeSql command before calling execute on the
> StreamExecutionEnvironment.
>
> Everything behaves as expected until I either try to set
> /'format.ignore-parse-errors' = 'true'/ in the connector options, or I try
> to add the Kafka record timestamp as a table field via /`ts` TIMESTAMP(3)
> METADATA FROM 'timestamp'/. In both of these case I get:
>
> *Exception in thread "main" org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.*
> *Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
> Reason: No factory supports all properties.
> *
>
> Additionally, for ignoring parsing errors:
> *The matching candidates:
> org.apache.flink.formats.json.JsonRowFormatFactory
> Unsupported property keys:
> format.ignore-parse-errors*
>
> While, for the timestamp field:
> *The matching candidates:
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> Unsupported property keys:
> schema.#.metadata
> schema.#.virtual*
>
> Here is the DDL code used for table creation:
> /
>     "CREATE TEMPORARY TABLE `" + name + "` (" + tableFields + ") " +
>       "WITH (" +
>       "'connector.type' = 'kafka', " +
>       "'connector.version' = 'universal', " +
>       "'connector.topic' = '" + name + "', " +
>       "'connector.properties.bootstrap.servers' = '" + kafkaAddr + "', " +
>       "'connector.startup-mode' = '" +
>       (if (checkLatest) "latest-offset" else "earliest-offset") +
>       "', " +
>       "'connector.properties.default.api.timeout.ms' = '5000', " +
>       "'format.type' = 'json', " +
>       "'format.fail-on-missing-field' = 'false'" +
>       ")"
> /
>
> And here is the Flink-related config from build.sbt:
> /
> lazy val flinkVersion       = "1.12.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink"          %% "flink-scala"                    %
> flinkVersion,
>   "org.apache.flink"          %% "flink-streaming-scala"          %
> flinkVersion,
>   "org.apache.flink"          %% "flink-connector-kafka"          %
> flinkVersion,
>   "org.apache.flink"          %% "flink-clients"                  %
> flinkVersion,
>   "org.apache.flink"          %% "flink-table-api-scala-bridge"   %
> flinkVersion,
>   "org.apache.flink"          %% "flink-table-planner-blink"      %
> flinkVersion,
>   "org.apache.flink"           % "flink-json"                     %
> flinkVersion,
>   "org.apache.flink"          %% "flink-test-utils"               %
> flinkVersion      % Test,
>   "org.apache.flink"          %% "flink-runtime"                  %
> flinkVersion      % Test classifier "tests",
>   "org.apache.flink"          %% "flink-streaming-java"           %
> flinkVersion      % Test classifier "tests",
> )
> /
>
> I would appreciate any tips on getting both the timestamp and the error
> parse setting to work. Thank you in advance!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to