Hi, Eugenio

AFAIK, you could define watermark on the data_fine by adding attribute
in phasesDurationsSchema.
For example:

final Schema phasesDurationsSchema = Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT().notNull())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase", DataTypes.STRING().notNull())
.column("fase_completata", DataTypes.BOOLEAN())
.column("durata", DataTypes.BIGINT())
.column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
.column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
.watermark("data_fine", "data_fine - INTERVAL '1' SECOND")
.primaryKey("id_fascicolo", "fase")
.build();

See more in Table API and DataStream API integration docs[1].

Best,
Xiangyu

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/

Eugenio Marotti <ing.eugenio.maro...@gmail.com> 于2023年6月25日周日 15:35写道:

> Hi everyone,
>
> I'm using Flink for processing some streaming data. First of all I have
> two tables receiving events from Kafka. These tables are joined and the
> resulting table is converted to a DataStream where it is processed by a
> custom KeyedProcessFunction. The output is then converted to a table and
> sent to Opensearch. Here’s the code I’m using:
>
>
> final TableDescriptor legalFilesTableDescriptor = TableDescriptor.
> forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("id", DataTypes.BIGINT())
> .column("nrg", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .build())
> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli"))
> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
> ScanStartupMode.LATEST_OFFSET)
> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
> .build();
> tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor);
> Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”);
>
>
> final TableDescriptor eventsTableDescriptor = TableDescriptor.forConnector
> ("kafka")
> .schema(Schema.newBuilder()
> .column("id", DataTypes.BIGINT())
> .column("data", DataTypes.BIGINT())
> .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)")
> .column("evento", DataTypes.STRING())
> .column("id_fascicolo", DataTypes.BIGINT())
> .build())
> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi"))
> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.
> ScanStartupMode.LATEST_OFFSET)
> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
> .build();
> tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor);
>
>
> Table legalFileEventsTable = legalFilesTable.join(eventsTable)
> .where($("id").isEqual($("id_fascicolo")))
> .select(
> $("id").as("id_fascicolo"),
> $("id_evento"),
> $("giudice"),
> $("nrg"),
> $("codice_oggetto"),
> $("oggetto"),
> $("ufficio"),
> $("sezione"),
> $("data_evento"),
> $("evento")
> );
>
>
> DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(
> legalFileEventsTable)
> .keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
> .process(new PhaseDurationCounterProcessFunction())
> .returns(new RowTypeInfo(
> new TypeInformation[] {
> BasicTypeInfo.LONG_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO,
> BasicTypeInfo.LONG_TYPE_INFO,
> BasicTypeInfo.INSTANT_TYPE_INFO,
> BasicTypeInfo.INSTANT_TYPE_INFO
> },
> new String[] { "id_fascicolo", "nrg", "giudice", "oggetto",
> "codice_oggetto",
> "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio",
> "data_fine" }
> ));
>
> final Schema phasesDurationsSchema = Schema.newBuilder()
> .column("id_fascicolo", DataTypes.BIGINT().notNull())
> .column("nrg", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("fase", DataTypes.STRING().notNull())
> .column("fase_completata", DataTypes.BOOLEAN())
> .column("durata", DataTypes.BIGINT())
> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
> .primaryKey("id_fascicolo", "fase")
> .build();
>
> Table phasesDurationsTable = tEnv.fromChangelogStream(
> phasesDurationsDataStream, phasesDurationsSchema,
> ChangelogMode.upsert());
>
> final TableDescriptor phasesDurationsOS = TableDescriptor.forConnector(
> "opensearch")
> .schema(phasesDurationsSchema)
> .option(OpensearchConnectorOptions.HOSTS_OPTION, List.of(OPENSEARCH_HOST))
> .option(OpensearchConnectorOptions.INDEX_OPTION,
> "legal_files_phase_duration")
> .option(OpensearchConnectorOptions.USERNAME_OPTION, "admin")
> .option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin")
> .option(OpensearchConnectorOptions.ALLOW_INSECURE, true)
> .build();
> tEnv.createTable("PhasesDurationsOS", phasesDurationsOS);
>
>
> After that I filter the *phasesDurationTable* like this:
>
> Table filteredPhasesDurationsTable = phasesDurationsTable
> .where($("fase_completata").isTrue())
> .select(
> $("id_fascicolo"),
> $("nrg"),
> $("giudice"),
> $("oggetto"),
> $("codice_oggetto"),
> $("ufficio"),
> $("sezione"),
> $("fase"),
> $("durata"),
> $("data_inizio"),
> $("data_fine")
> );
>
> With the *filteredPhasesDurationsTable* I need to calculate some averages
> with a sliding window. So I need to define a watermark on the *data_fine* 
> parameter.
> Is there a way to do this?
>

Reply via email to