Hi Georg,

which Flink version are you using? The missing property is for the avro-confluent format, and if I recall correctly, how these are passed has changed in recent versions, so it'd be good to double check you are using the documentation for the version you are running on.


Best
Ingo

On 24.03.22 11:57, Georg Heiler wrote:
Hi,

how can I get Flinks SQL client to nicely sink some data to either the regular kafka or the kafka-upsert connector?

I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
     `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
     WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
   `partition` BIGINT METADATA VIRTUAL,
   `offset` BIGINT METADATA VIRTUAL,
     brand string,
     duration int,
     rating int

) WITH (
     'connector' = 'kafka',
     'topic' = 'commercials_avro',
     'scan.startup.mode' = 'earliest-offset',
     'format' = 'avro-confluent',
    'avro-confluent.schema-registry.url' = 'http://localhost:8081/ <http://localhost:8081/>',
     'properties.group.id <http://properties.group.id>' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092'
);

And the following aggregation:

SELECT brand,
          COUNT(*) AS cnt,
          AVG(duration) AS  duration_mean,
          AVG(rating) AS rating_mean
   FROM metrics_brand_stream
   GROUP BY brand;

When trying to define an output table:

CREATE TABLE metrics_per_brand (
     brand string,
     cnt BIGINT,
     duration_mean DOUBLE,
     rating_mean DOUBLE

) WITH (
     'connector' = 'upsert-kafka',
     'topic' = 'metrics_per_brand',
    'avro-confluent.schema-registry.url' = 'http://localhost:8081/ <http://localhost:8081/>',
     'properties.group.id <http://properties.group.id>' = 'flink-test-001',
     'properties.bootstrap.servers' = 'localhost:9092',
     'key.format' = 'avro-confluent',
     'value.format' = 'avro-confluent'
);

And trying to INSERT some result data:

INSERT INTO metrics_per_brand
   SELECT brand,
          COUNT(*) AS cnt,
          AVG(duration) AS  duration_mean,
          AVG(rating) AS rating_mean
   FROM metrics_brand_stream
   GROUP BY brand;

The query fails with:

org.apache.flink.table.api.ValidationException: One or more required options are missing.

Missing required options are:

url

But neither: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/ <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/> nor https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/ <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/> nor https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/> seems to list the right configuration (or I am misreading the documentation).


How can I sink data to kafka after some arbitrary computation using the flink-sql client using either the kafka or upsert-kafka connector where the input is AVRO with a schema from the confluent schema registry and the output should store its schema there as well (and serialize using AVRO).


Best,
Georg

Reply via email to