Hi,
I’m trying to execute a window aggregation on two joined table from two Kafka
topics (upsert fashion), but I get no output. Here’s the code I’m using:
This is the first table from Kafka with an event time watermark on ‘data_fine’
attribute:
final TableDescriptor phasesDurationsTableDescriptor =
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT().notNull())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase_completata", DataTypes.BOOLEAN())
.column("fase", DataTypes.STRING().notNull())
.column("durata", DataTypes.BIGINT())
.column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
.column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
.watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
.primaryKey("id_fascicolo", "fase")
.build())
.option(KafkaConnectorOptions.TOPIC,
List.of("sicid.processor.phases-durations"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.KEY_FORMAT, "json")
.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
.build();
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
Here’s the second table:
final TableDescriptor averageJudgeByPhaseReportTableDescriptor =
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("giudice", DataTypes.STRING().notNull())
.column("fase", DataTypes.STRING().notNull())
.column("media_mobile", DataTypes.BIGINT())
.primaryKey("giudice", "fase")
.build())
.option(KafkaConnectorOptions.TOPIC,
List.of("sicid.processor.average-judge-by-phase-report"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.KEY_FORMAT, "json")
.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
.option(KafkaConnectorOptions.PROPS_GROUP_ID,
"average-judge-by-phase-report")
.build();
tEnv.createTable("AverageJudgeByPhaseReport_Kafka",
averageJudgeByPhaseReportTableDescriptor);
Table averageJudgeByPhaseReportTable =
tEnv.from("AverageJudgeByPhaseReport_Kafka");
Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
.select(
$("giudice").as("giudice_media"),
$("fase").as("fase_media"),
$("media_mobile")
);
And here’s the code I’m experimenting with:
phasesDurationsTable
.join(renamedAverageJudgeByPhaseReportTable)
.where($("giudice").isEqual($("giudice_media")))
.window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
.groupBy(
$("giudice"),
$("w")
)
.select(
$("giudice")
)
.execute().print();
Am I doing something wrong?