I'm implementing a data analysis pipeline in Flink and I have a problem
converting a DataStream to a Table. I have this table defined from a join
between two Kafka sources:
Table legalFileEventsTable = legalFilesTable.join(eventsTable)
.where($("id").isEqual($("id_fascicolo")))
.select(
$("id").as("id_fascicolo"),
$("id_evento"),
$("giudice"),
$("nrg"),
$("codice_oggetto"),
$("ufficio"),
$("sezione"),
$("data_evento"),
$("evento"),
$("data_registrazione_evento")
);
Then I convert the joined table to a DataStream to apply some computation on
the data. Here's the code I'm using:
DataStream<Row> phasesDurationsDataStream =
tEnv.toChangelogStream(legalFileEventsTable)
.keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
.process(new PhaseDurationCounterProcessFunction());
phasesDurationsDataStream.print();
The PhaseDurationCounterProcessFunction emits a Row like this:
Row outputRow = Row.withNames(RowKind.INSERT);
outputRow.setField("id_fascicolo", currentState.getId_fascicolo());
outputRow.setField("nrg", currentState.getNrg());
outputRow.setField("giudice", currentState.getGiudice());
outputRow.setField("codice_oggetto", currentState.getCodice_oggetto());
outputRow.setField("ufficio", currentState.getUfficio());
outputRow.setField("sezione", currentState.getSezione());
outputRow.setField("fase", currentState.getPhase());
outputRow.setField("fase_completata", false);
outputRow.setField("durata", currentState.getDurationCounter());
out.collect(outputRow);
After collecting the results from the process function I reconvert the
DataStream to a Table and execute the pipeline:
Table phasesDurationsTable = tEnv.fromChangelogStream(
phasesDurationsDataStream,
Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase", DataTypes.STRING())
.column("fase_completata", DataTypes.BOOLEAN())
.column("durata", DataTypes.BIGINT())
.primaryKey("id_fascicolo", "fase")
.build(),
ChangelogMode.upsert()
);
env.execute();
But during the startup I receive this exception:
Unable to find a field named 'id_fascicolo' in the physical data type derived
from the given type information for schema declaration.
Make sure that the type information is not a generic raw type. Currently
available fields are: [f0]
It seems that the row information (name and type) aren't available yet and so
the exception is generated. I tried to invoke the env.execute() before the
DataStream->Table conversion and in this case the job starts but I have no
output if I print the phasesDurationsTable. Any suggestions on how to make this
work?
Eugenio