Can you explain the use case a bit more? In order to write a SQL statement
(at least one that doesn't use wildcard selection) you also need to know
the schema ahead of time. What are you trying to accomplish with these
dynamic schemas?

Reuven

On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov <e.siga...@gmail.com> wrote:

> Hello, In the upcoming process, we extract Avro messages from Kafka utilizing 
> the Confluent Schema Registry.
>
> Our intention is to implement SQL queries on the streaming data.
>
>
> As far as I understand, since I am using the Flink runner, when creating  the 
> features PCollection, I must specify the
>
> row schema or a coder.
>
>
> I am interested in utilizing the schema obtained from the recently read 
> message (refer to ConvertRow).
>
> Is it possible to accomplish this when executing on a Flinkrunner?
>
> I noticed that the Flink runner anticipates the row schema to be 
> predetermined during pipeline deployment.
>
>
> Are there any potential solutions or workarounds for this situation?
>
>
> public class BeamSqlTest {
>
>
>     public static void main(String[] args) {
>
>         Pipeline pipeline;
>         PCollection<KafkaRecord<String, GenericRecord>> 
> readAvroMessageFromKafka = pipeline.apply("readAvroMessageFromKafka", 
> KafkaTransform.readAvroMessageFromKafkaWithSchemaRegistry(pipelineUtil.getBootstrapServers(),
>  options.getSourceKafkaTopic(), PIPELINE_NAME));
>         PCollection<KV<String, GenericRecord>> avroMessages = 
> readAvroMessageFromKafka.apply("convertFromKafkaRecord", ParDo.of(new 
> ConvertFromKafkaRecord<>()));
>
>         PCollection<Row> features = avroMessages.apply(ParDo.of(new 
> ConvertToRow())).setRowSchema(XXX);
>         final PCollection<Row> select_fields = features.apply("Select 
> Fields", Select.fieldNames("X","Y","Z"));
>
>         final PCollection<Row> windowRes = select_fields.apply("Windowing", 
> Window.into(FixedWindows.of(Duration.standardMinutes(1))));
>         PCollection<Row> outputStream = 
> windowRes.apply(SqlTransform.query("select X, Y,Z from PCOLLECTION"));
>         pipeline.run().waitUntilFinish();
>     }
>
>     @AllArgsConstructor
>     public static class ConvertToRow extends DoFn<KV<String, GenericRecord>, 
> Row> {
>         @ProcessElement
>         @SuppressWarnings({"ConstantConditions", "unused"})
>         public void processElement(ProcessContext c) {
>             GenericRecord record = c.element().getValue();
>             final org.apache.avro.Schema avroSchema = record.getSchema();
>             Schema schema = AvroUtils.toBeamSchema(avroSchema);
>
>             Object x = record.get("X");
>             Object y = record.get("Y");
>             Object z = record.get("Z");
>             Row row = Row.withSchema(schema).addValues(x, y, z).build();
>             c.output(row);
>         }
>     }
> }
>
>
> Thanks
>
> Sigalit
>
>

Reply via email to