Hi Georg,
which Flink version are you using? The missing property is for the
avro-confluent format, and if I recall correctly, how these are passed
has changed in recent versions, so it'd be good to double check you are
using the documentation for the version you are running on.
Best
Ingo
On 24.03.22 11:57, Georg Heiler wrote:
Hi,
how can I get Flinks SQL client to nicely sink some data to either the
regular kafka or the kafka-upsert connector?
I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
brand string,
duration int,
rating int
) WITH (
'connector' = 'kafka',
'topic' = 'commercials_avro',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/
<http://localhost:8081/>',
'properties.group.id <http://properties.group.id>' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092'
);
And the following aggregation:
SELECT brand,
COUNT(*) AS cnt,
AVG(duration) AS duration_mean,
AVG(rating) AS rating_mean
FROM metrics_brand_stream
GROUP BY brand;
When trying to define an output table:
CREATE TABLE metrics_per_brand (
brand string,
cnt BIGINT,
duration_mean DOUBLE,
rating_mean DOUBLE
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'metrics_per_brand',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/
<http://localhost:8081/>',
'properties.group.id <http://properties.group.id>' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'avro-confluent',
'value.format' = 'avro-confluent'
);
And trying to INSERT some result data:
INSERT INTO metrics_per_brand
SELECT brand,
COUNT(*) AS cnt,
AVG(duration) AS duration_mean,
AVG(rating) AS rating_mean
FROM metrics_brand_stream
GROUP BY brand;
The query fails with:
org.apache.flink.table.api.ValidationException: One or more required
options are missing.
Missing required options are:
url
But neither:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/>
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/>
seems to list the right configuration (or I am misreading the
documentation).
How can I sink data to kafka after some arbitrary computation using the
flink-sql client using either the kafka or upsert-kafka connector where
the input is AVRO with a schema from the confluent schema registry and
the output should store its schema there as well (and serialize using AVRO).
Best,
Georg