Hi Eugenio,

According to docs[1], there are two ways to define the watermark in a table:
1. Defining in DDL
2. During DataStream-to-Table Conversion

In your case, I think could use CREATE TABLE DDL to create a new table from
filteredPhasesDurationsTable with watermark.
See more in CREATE Statement docs[2].


[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/time_attributes/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/create/

Best,
Xiangyu

Eugenio Marotti <ing.eugenio.maro...@gmail.com> 于2023年6月26日周一 13:03写道:

> Hi,
>
> thanks you for the suggestion. The problem is that data_fine in my case
> can be null, so I wanted to first filter by $("fase_completata").isTrue()
> (when fase_completata is true data_fine is not null) and then define the
> watermark. Is it possible to define it after the last select?
>
> Best,
> Eugenio
>
> Il giorno 26 giu 2023, alle ore 05:31, feng xiangyu <xiangyu...@gmail.com>
> ha scritto:
>
> Hi, Eugenio
>
> AFAIK, you could define watermark on the data_fine by adding attribute in 
> phasesDurationsSchema.
> For example:
>
> final Schema phasesDurationsSchema = Schema.newBuilder()
> .column("id_fascicolo", DataTypes.BIGINT().notNull())
> .column("nrg", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("fase", DataTypes.STRING().notNull())
> .column("fase_completata", DataTypes.BOOLEAN())
> .column("durata", DataTypes.BIGINT())
> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
> .watermark("data_fine", "data_fine - INTERVAL '1' SECOND")
> .primaryKey("id_fascicolo", "fase")
> .build();
>
> See more in Table API and DataStream API integration docs[1].
>
> Best,
> Xiangyu
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/
>
> Eugenio Marotti <ing.eugenio.maro...@gmail.com> 于2023年6月25日周日 15:35写道:
>
>> Hi everyone,
>>
>> I'm using Flink for processing some streaming data. First of all I have
>> two tables receiving events from Kafka. These tables are joined and the
>> resulting table is converted to a DataStream where it is processed by a
>> custom KeyedProcessFunction. The output is then converted to a table and
>> sent to Opensearch. Here’s the code I’m using:
>>
>>
>> final TableDescriptor legalFilesTableDescriptor = TableDescriptor.
>> forConnector("kafka")
>> .schema(Schema.newBuilder()
>> .column("id", DataTypes.BIGINT())
>> .column("nrg", DataTypes.STRING())
>> .column("ufficio", DataTypes.STRING())
>> .column("sezione", DataTypes.STRING())
>> .column("giudice", DataTypes.STRING())
>> .column("oggetto", DataTypes.STRING())
>> .column("codice_oggetto", DataTypes.STRING())
>> .build())
>> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli"))
>> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
>> ScanStartupMode.LATEST_OFFSET)
>> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
>> .build();
>> tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor);
>> Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”);
>>
>>
>> final TableDescriptor eventsTableDescriptor = TableDescriptor.
>> forConnector("kafka")
>> .schema(Schema.newBuilder()
>> .column("id", DataTypes.BIGINT())
>> .column("data", DataTypes.BIGINT())
>> .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)")
>> .column("evento", DataTypes.STRING())
>> .column("id_fascicolo", DataTypes.BIGINT())
>> .build())
>> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi"))
>> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
>> ScanStartupMode.LATEST_OFFSET)
>> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
>> .build();
>> tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor);
>>
>>
>> Table legalFileEventsTable = legalFilesTable.join(eventsTable)
>> .where($("id").isEqual($("id_fascicolo")))
>> .select(
>> $("id").as("id_fascicolo"),
>> $("id_evento"),
>> $("giudice"),
>> $("nrg"),
>> $("codice_oggetto"),
>> $("oggetto"),
>> $("ufficio"),
>> $("sezione"),
>> $("data_evento"),
>> $("evento")
>> );
>>
>>
>> DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(
>> legalFileEventsTable)
>> .keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
>> .process(new PhaseDurationCounterProcessFunction())
>> .returns(new RowTypeInfo(
>> new TypeInformation[] {
>> BasicTypeInfo.LONG_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO,
>> BasicTypeInfo.LONG_TYPE_INFO,
>> BasicTypeInfo.INSTANT_TYPE_INFO,
>> BasicTypeInfo.INSTANT_TYPE_INFO
>> },
>> new String[] { "id_fascicolo", "nrg", "giudice", "oggetto",
>> "codice_oggetto",
>> "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio",
>> "data_fine" }
>> ));
>>
>> final Schema phasesDurationsSchema = Schema.newBuilder()
>> .column("id_fascicolo", DataTypes.BIGINT().notNull())
>> .column("nrg", DataTypes.STRING())
>> .column("giudice", DataTypes.STRING())
>> .column("oggetto", DataTypes.STRING())
>> .column("codice_oggetto", DataTypes.STRING())
>> .column("ufficio", DataTypes.STRING())
>> .column("sezione", DataTypes.STRING())
>> .column("fase", DataTypes.STRING().notNull())
>> .column("fase_completata", DataTypes.BOOLEAN())
>> .column("durata", DataTypes.BIGINT())
>> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
>> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
>> .primaryKey("id_fascicolo", "fase")
>> .build();
>>
>> Table phasesDurationsTable = tEnv.fromChangelogStream(
>> phasesDurationsDataStream, phasesDurationsSchema,
>> ChangelogMode.upsert());
>>
>> final TableDescriptor phasesDurationsOS = TableDescriptor.forConnector(
>> "opensearch")
>> .schema(phasesDurationsSchema)
>> .option(OpensearchConnectorOptions.HOSTS_OPTION, List.of(OPENSEARCH_HOST
>> ))
>> .option(OpensearchConnectorOptions.INDEX_OPTION,
>> "legal_files_phase_duration")
>> .option(OpensearchConnectorOptions.USERNAME_OPTION, "admin")
>> .option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin")
>> .option(OpensearchConnectorOptions.ALLOW_INSECURE, true)
>> .build();
>> tEnv.createTable("PhasesDurationsOS", phasesDurationsOS);
>>
>>
>> After that I filter the *phasesDurationTable* like this:
>>
>> Table filteredPhasesDurationsTable = phasesDurationsTable
>> .where($("fase_completata").isTrue())
>> .select(
>> $("id_fascicolo"),
>> $("nrg"),
>> $("giudice"),
>> $("oggetto"),
>> $("codice_oggetto"),
>> $("ufficio"),
>> $("sezione"),
>> $("fase"),
>> $("durata"),
>> $("data_inizio"),
>> $("data_fine")
>> );
>>
>> With the *filteredPhasesDurationsTable* I need to calculate some
>> averages with a sliding window. So I need to define a watermark on the
>> *data_fine* parameter. Is there a way to do this?
>>
>
>

Reply via email to