Hi, Eugenio AFAIK, you could define watermark on the data_fine by adding attribute in phasesDurationsSchema. For example:
final Schema phasesDurationsSchema = Schema.newBuilder() .column("id_fascicolo", DataTypes.BIGINT().notNull()) .column("nrg", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("oggetto", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("fase", DataTypes.STRING().notNull()) .column("fase_completata", DataTypes.BOOLEAN()) .column("durata", DataTypes.BIGINT()) .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) .watermark("data_fine", "data_fine - INTERVAL '1' SECOND") .primaryKey("id_fascicolo", "fase") .build(); See more in Table API and DataStream API integration docs[1]. Best, Xiangyu [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/ Eugenio Marotti <ing.eugenio.maro...@gmail.com> 于2023年6月25日周日 15:35写道: > Hi everyone, > > I'm using Flink for processing some streaming data. First of all I have > two tables receiving events from Kafka. These tables are joined and the > resulting table is converted to a DataStream where it is processed by a > custom KeyedProcessFunction. The output is then converted to a table and > sent to Opensearch. Here’s the code I’m using: > > > final TableDescriptor legalFilesTableDescriptor = TableDescriptor. > forConnector("kafka") > .schema(Schema.newBuilder() > .column("id", DataTypes.BIGINT()) > .column("nrg", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .build()) > .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. > ScanStartupMode.LATEST_OFFSET) > .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") > .build(); > tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor); > Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”); > > > final TableDescriptor eventsTableDescriptor = TableDescriptor.forConnector > ("kafka") > .schema(Schema.newBuilder() > .column("id", DataTypes.BIGINT()) > .column("data", DataTypes.BIGINT()) > .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)") > .column("evento", DataTypes.STRING()) > .column("id_fascicolo", DataTypes.BIGINT()) > .build()) > .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. > ScanStartupMode.LATEST_OFFSET) > .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") > .build(); > tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor); > > > Table legalFileEventsTable = legalFilesTable.join(eventsTable) > .where($("id").isEqual($("id_fascicolo"))) > .select( > $("id").as("id_fascicolo"), > $("id_evento"), > $("giudice"), > $("nrg"), > $("codice_oggetto"), > $("oggetto"), > $("ufficio"), > $("sezione"), > $("data_evento"), > $("evento") > ); > > > DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream( > legalFileEventsTable) > .keyBy(r -> r.<Long>getFieldAs("id_fascicolo")) > .process(new PhaseDurationCounterProcessFunction()) > .returns(new RowTypeInfo( > new TypeInformation[] { > BasicTypeInfo.LONG_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.BOOLEAN_TYPE_INFO, > BasicTypeInfo.LONG_TYPE_INFO, > BasicTypeInfo.INSTANT_TYPE_INFO, > BasicTypeInfo.INSTANT_TYPE_INFO > }, > new String[] { "id_fascicolo", "nrg", "giudice", "oggetto", > "codice_oggetto", > "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio", > "data_fine" } > )); > > final Schema phasesDurationsSchema = Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase", DataTypes.STRING().notNull()) > .column("fase_completata", DataTypes.BOOLEAN()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .primaryKey("id_fascicolo", "fase") > .build(); > > Table phasesDurationsTable = tEnv.fromChangelogStream( > phasesDurationsDataStream, phasesDurationsSchema, > ChangelogMode.upsert()); > > final TableDescriptor phasesDurationsOS = TableDescriptor.forConnector( > "opensearch") > .schema(phasesDurationsSchema) > .option(OpensearchConnectorOptions.HOSTS_OPTION, List.of(OPENSEARCH_HOST)) > .option(OpensearchConnectorOptions.INDEX_OPTION, > "legal_files_phase_duration") > .option(OpensearchConnectorOptions.USERNAME_OPTION, "admin") > .option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin") > .option(OpensearchConnectorOptions.ALLOW_INSECURE, true) > .build(); > tEnv.createTable("PhasesDurationsOS", phasesDurationsOS); > > > After that I filter the *phasesDurationTable* like this: > > Table filteredPhasesDurationsTable = phasesDurationsTable > .where($("fase_completata").isTrue()) > .select( > $("id_fascicolo"), > $("nrg"), > $("giudice"), > $("oggetto"), > $("codice_oggetto"), > $("ufficio"), > $("sezione"), > $("fase"), > $("durata"), > $("data_inizio"), > $("data_fine") > ); > > With the *filteredPhasesDurationsTable* I need to calculate some averages > with a sliding window. So I need to define a watermark on the *data_fine* > parameter. > Is there a way to do this? >