I’m sorry, the whole code is:
class WormholeDeserializationSchema(schema: String) extends
KeyedDeserializationSchema[Row] {
//var keyValueTopic:KeyValueTopic = _
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long) = {
val deserializationSchema = new SimpleStringSchema()
val key = if (messageKey != null)
deserializationSchema.deserialize(messageKey) else null
val value = if (message != null) deserializationSchema.deserialize(message)
else null
val ums = UmsSchemaUtils.toUms(value)
ums.payload_get.map(_.tuple).map(tuple => Row.of(tuple: _*)).head
}
override def isEndOfStream(nextElement: Row): Boolean = false
override def getProducedType: TypeInformation[Row] = {
val (fieldNameArray, fieldTypeArray) = getSchemaMap(schema)
val types =new RowTypeInfo(fieldTypeArray,fieldNameArray)
types
}
private def getSchemaMap(jsonSchema: String) = {
val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
val fields = umsSchema.fields_get
val fieldNameList = ListBuffer.empty[String]
val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
fields.foreach {
field =>
fieldNameList.append(field.name)
fieldTypeList.append(fieldTypeMatch(field.`type`))
}
println(fieldNameList)
println(fieldTypeList)
(fieldNameList.toArray, fieldTypeList.toArray)
}
private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
umsFieldType match {
case STRING => Types.STRING
case INT => Types.INT
case LONG => Types.LONG
case FLOAT => Types.FLOAT
case DOUBLE => Types.DOUBLE
case BOOLEAN => Types.BOOLEAN
case DATE => Types.SQL_DATE
case DATETIME => Types.SQL_TIMESTAMP
case DECIMAL => Types.DECIMAL
}
}
}
val inputStream: DataStream[Row] = env.addSource(myConsumer)
val tableEnv = TableEnvironment.getTableEnvironment(env)
Thanks~
sen
> 在 2018年6月6日,下午7:22,孙森 <[email protected]> 写道:
>
> Hi ,
>
>
> I've tried to to specify such a schema, when I read from kafka, and covert
> inputstream to table . But I got the exception:
>
> Exception in thread "main" org.apache.flink.table.api.TableException: An
> input of GenericTypeInfo cannot be converted to Table. Please specify the
> type of the input with a RowTypeInfo
> And the code here:
>
>
> private def getSchemaMap(jsonSchema: String) = {
> val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
> val fields = umsSchema.fields_get
> val fieldNameList = ListBuffer.empty[String]
> val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
> fields.foreach {
> field =>
> fieldNameList.append(field.name)
> fieldTypeList.append(fieldTypeMatch(field.`type`))
> }
> println(fieldNameList)
> println(fieldTypeList)
> (fieldNameList.toArray, fieldTypeList.toArray)
> }
>
>
> private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_]
> = {
> umsFieldType match {
> case STRING => Types.STRING
> case INT => Types.INT
> case LONG => Types.LONG
> case FLOAT => Types.FLOAT
> case DOUBLE => Types.DOUBLE
> case BOOLEAN => Types.BOOLEAN
> case DATE => Types.SQL_DATE
> case DATETIME => Types.SQL_TIMESTAMP
> case DECIMAL => Types.DECIMAL
> }
> }
> }
>
>
> val myConsumer: FlinkKafkaConsumer010[Row] = new
> FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema),
> properties)
> val inputStream: DataStream[Row] = env.addSource(myConsumer)
> val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here
>
>
>
> Thanks !
> sen