Hi ,
I've tried to to specify such a schema, when I read from kafka, and covert
inputstream to table . But I got the exception:
Exception in thread "main" org.apache.flink.table.api.TableException: An input
of GenericTypeInfo cannot be converted to Table. Please specify the type of the
input with a RowTypeInfo
And the code here:
private def getSchemaMap(jsonSchema: String) = {
val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
val fields = umsSchema.fields_get
val fieldNameList = ListBuffer.empty[String]
val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
fields.foreach {
field =>
fieldNameList.append(field.name)
fieldTypeList.append(fieldTypeMatch(field.`type`))
}
println(fieldNameList)
println(fieldTypeList)
(fieldNameList.toArray, fieldTypeList.toArray)
}
private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
umsFieldType match {
case STRING => Types.STRING
case INT => Types.INT
case LONG => Types.LONG
case FLOAT => Types.FLOAT
case DOUBLE => Types.DOUBLE
case BOOLEAN => Types.BOOLEAN
case DATE => Types.SQL_DATE
case DATETIME => Types.SQL_TIMESTAMP
case DECIMAL => Types.DECIMAL
}
}
}
val myConsumer: FlinkKafkaConsumer010[Row] = new FlinkKafkaConsumer010(topics,
new WormholeDeserializationSchema(jsonSchema), properties)
val inputStream: DataStream[Row] = env.addSource(myConsumer)
val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here
Thanks !
sen