I’m sorry,  the whole code is:


 class WormholeDeserializationSchema(schema: String) extends 
KeyedDeserializationSchema[Row] {
  //var keyValueTopic:KeyValueTopic = _


  override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
topic: String, partition: Int, offset: Long) = {
    val deserializationSchema = new SimpleStringSchema()
    val key = if (messageKey != null) 
deserializationSchema.deserialize(messageKey) else null
    val value = if (message != null) deserializationSchema.deserialize(message) 
else null
    val ums = UmsSchemaUtils.toUms(value)
    ums.payload_get.map(_.tuple).map(tuple => Row.of(tuple: _*)).head
  }

  override def isEndOfStream(nextElement: Row): Boolean = false

  override def getProducedType: TypeInformation[Row] = {
    val (fieldNameArray, fieldTypeArray) = getSchemaMap(schema)
    val types =new RowTypeInfo(fieldTypeArray,fieldNameArray)
    types
  }

  private def getSchemaMap(jsonSchema: String) = {
    val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
    val fields = umsSchema.fields_get
    val fieldNameList = ListBuffer.empty[String]
    val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
    fields.foreach {
      field =>
        fieldNameList.append(field.name)
        fieldTypeList.append(fieldTypeMatch(field.`type`))
    }
    println(fieldNameList)
    println(fieldTypeList)
    (fieldNameList.toArray, fieldTypeList.toArray)
  }


  private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] = {
    umsFieldType match {
      case STRING => Types.STRING
      case INT => Types.INT
      case LONG => Types.LONG
      case FLOAT => Types.FLOAT
      case DOUBLE => Types.DOUBLE
      case BOOLEAN => Types.BOOLEAN
      case DATE => Types.SQL_DATE
      case DATETIME => Types.SQL_TIMESTAMP
      case DECIMAL => Types.DECIMAL
    }
  }
}



 val inputStream: DataStream[Row] = env.addSource(myConsumer)
 val tableEnv = TableEnvironment.getTableEnvironment(env)
Thanks~

sen

> 在 2018年6月6日,下午7:22,孙森 <[email protected]> 写道:
> 
> Hi ,
>     
>      
> I've tried to to specify such a schema, when I read from kafka, and covert 
> inputstream to table . But I got the exception:
> 
> Exception in thread "main" org.apache.flink.table.api.TableException: An 
> input of GenericTypeInfo cannot be converted to Table. Please specify the 
> type of the input with a RowTypeInfo
> And the code here:
> 
> 
> private def getSchemaMap(jsonSchema: String) = {
>     val umsSchema = JsonUtils.json2caseClass[UmsSchema](jsonSchema)
>     val fields = umsSchema.fields_get
>     val fieldNameList = ListBuffer.empty[String]
>     val fieldTypeList = ListBuffer.empty[TypeInformation[_]]
>     fields.foreach {
>       field =>
>         fieldNameList.append(field.name)
>         fieldTypeList.append(fieldTypeMatch(field.`type`))
>     }
>     println(fieldNameList)
>     println(fieldTypeList)
>     (fieldNameList.toArray, fieldTypeList.toArray)
>   }
> 
> 
>   private def fieldTypeMatch(umsFieldType: UmsFieldType): TypeInformation[_] 
> = {
>     umsFieldType match {
>       case STRING => Types.STRING
>       case INT => Types.INT
>       case LONG => Types.LONG
>       case FLOAT => Types.FLOAT
>       case DOUBLE => Types.DOUBLE
>       case BOOLEAN => Types.BOOLEAN
>       case DATE => Types.SQL_DATE
>       case DATETIME => Types.SQL_TIMESTAMP
>       case DECIMAL => Types.DECIMAL
>     }
>   }
> }
> 
> 
>  val myConsumer: FlinkKafkaConsumer010[Row] = new 
> FlinkKafkaConsumer010(topics, new WormholeDeserializationSchema(jsonSchema), 
> properties)
>  val inputStream: DataStream[Row] = env.addSource(myConsumer)
>  val tableEnv = TableEnvironment.getTableEnvironment(env)<<—————exception here
> 
> 
> 
> Thanks !
> sen

Reply via email to