Hi Eugenio, According to docs[1], there are two ways to define the watermark in a table: 1. Defining in DDL 2. During DataStream-to-Table Conversion
In your case, I think could use CREATE TABLE DDL to create a new table from filteredPhasesDurationsTable with watermark. See more in CREATE Statement docs[2]. [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/time_attributes/ [2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/create/ Best, Xiangyu Eugenio Marotti <ing.eugenio.maro...@gmail.com> 于2023年6月26日周一 13:03写道: > Hi, > > thanks you for the suggestion. The problem is that data_fine in my case > can be null, so I wanted to first filter by $("fase_completata").isTrue() > (when fase_completata is true data_fine is not null) and then define the > watermark. Is it possible to define it after the last select? > > Best, > Eugenio > > Il giorno 26 giu 2023, alle ore 05:31, feng xiangyu <xiangyu...@gmail.com> > ha scritto: > > Hi, Eugenio > > AFAIK, you could define watermark on the data_fine by adding attribute in > phasesDurationsSchema. > For example: > > final Schema phasesDurationsSchema = Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase", DataTypes.STRING().notNull()) > .column("fase_completata", DataTypes.BOOLEAN()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .watermark("data_fine", "data_fine - INTERVAL '1' SECOND") > .primaryKey("id_fascicolo", "fase") > .build(); > > See more in Table API and DataStream API integration docs[1]. > > Best, > Xiangyu > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/ > > Eugenio Marotti <ing.eugenio.maro...@gmail.com> 于2023年6月25日周日 15:35写道: > >> Hi everyone, >> >> I'm using Flink for processing some streaming data. First of all I have >> two tables receiving events from Kafka. These tables are joined and the >> resulting table is converted to a DataStream where it is processed by a >> custom KeyedProcessFunction. The output is then converted to a table and >> sent to Opensearch. Here’s the code I’m using: >> >> >> final TableDescriptor legalFilesTableDescriptor = TableDescriptor. >> forConnector("kafka") >> .schema(Schema.newBuilder() >> .column("id", DataTypes.BIGINT()) >> .column("nrg", DataTypes.STRING()) >> .column("ufficio", DataTypes.STRING()) >> .column("sezione", DataTypes.STRING()) >> .column("giudice", DataTypes.STRING()) >> .column("oggetto", DataTypes.STRING()) >> .column("codice_oggetto", DataTypes.STRING()) >> .build()) >> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli")) >> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) >> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. >> ScanStartupMode.LATEST_OFFSET) >> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") >> .build(); >> tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor); >> Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”); >> >> >> final TableDescriptor eventsTableDescriptor = TableDescriptor. >> forConnector("kafka") >> .schema(Schema.newBuilder() >> .column("id", DataTypes.BIGINT()) >> .column("data", DataTypes.BIGINT()) >> .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)") >> .column("evento", DataTypes.STRING()) >> .column("id_fascicolo", DataTypes.BIGINT()) >> .build()) >> .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi")) >> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) >> .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions. >> ScanStartupMode.LATEST_OFFSET) >> .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") >> .build(); >> tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor); >> >> >> Table legalFileEventsTable = legalFilesTable.join(eventsTable) >> .where($("id").isEqual($("id_fascicolo"))) >> .select( >> $("id").as("id_fascicolo"), >> $("id_evento"), >> $("giudice"), >> $("nrg"), >> $("codice_oggetto"), >> $("oggetto"), >> $("ufficio"), >> $("sezione"), >> $("data_evento"), >> $("evento") >> ); >> >> >> DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream( >> legalFileEventsTable) >> .keyBy(r -> r.<Long>getFieldAs("id_fascicolo")) >> .process(new PhaseDurationCounterProcessFunction()) >> .returns(new RowTypeInfo( >> new TypeInformation[] { >> BasicTypeInfo.LONG_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.BOOLEAN_TYPE_INFO, >> BasicTypeInfo.LONG_TYPE_INFO, >> BasicTypeInfo.INSTANT_TYPE_INFO, >> BasicTypeInfo.INSTANT_TYPE_INFO >> }, >> new String[] { "id_fascicolo", "nrg", "giudice", "oggetto", >> "codice_oggetto", >> "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio", >> "data_fine" } >> )); >> >> final Schema phasesDurationsSchema = Schema.newBuilder() >> .column("id_fascicolo", DataTypes.BIGINT().notNull()) >> .column("nrg", DataTypes.STRING()) >> .column("giudice", DataTypes.STRING()) >> .column("oggetto", DataTypes.STRING()) >> .column("codice_oggetto", DataTypes.STRING()) >> .column("ufficio", DataTypes.STRING()) >> .column("sezione", DataTypes.STRING()) >> .column("fase", DataTypes.STRING().notNull()) >> .column("fase_completata", DataTypes.BOOLEAN()) >> .column("durata", DataTypes.BIGINT()) >> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) >> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) >> .primaryKey("id_fascicolo", "fase") >> .build(); >> >> Table phasesDurationsTable = tEnv.fromChangelogStream( >> phasesDurationsDataStream, phasesDurationsSchema, >> ChangelogMode.upsert()); >> >> final TableDescriptor phasesDurationsOS = TableDescriptor.forConnector( >> "opensearch") >> .schema(phasesDurationsSchema) >> .option(OpensearchConnectorOptions.HOSTS_OPTION, List.of(OPENSEARCH_HOST >> )) >> .option(OpensearchConnectorOptions.INDEX_OPTION, >> "legal_files_phase_duration") >> .option(OpensearchConnectorOptions.USERNAME_OPTION, "admin") >> .option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin") >> .option(OpensearchConnectorOptions.ALLOW_INSECURE, true) >> .build(); >> tEnv.createTable("PhasesDurationsOS", phasesDurationsOS); >> >> >> After that I filter the *phasesDurationTable* like this: >> >> Table filteredPhasesDurationsTable = phasesDurationsTable >> .where($("fase_completata").isTrue()) >> .select( >> $("id_fascicolo"), >> $("nrg"), >> $("giudice"), >> $("oggetto"), >> $("codice_oggetto"), >> $("ufficio"), >> $("sezione"), >> $("fase"), >> $("durata"), >> $("data_inizio"), >> $("data_fine") >> ); >> >> With the *filteredPhasesDurationsTable* I need to calculate some >> averages with a sliding window. So I need to define a watermark on the >> *data_fine* parameter. Is there a way to do this? >> > >