Hi Martin,

usually, this error occurs when people forget to add `org.apache.flink.api.scala._` to their imports. It is triggered by the Scala macro that the DataStream API uses for extracting types.

Can you try to call `result.toAppendStream[Row]` directly? This should work if you import `org.apache.flink.table.api.scala._`.

Maybe this example helps:

https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala

Regards,
Timo


On 22.05.20 08:02, Martin Frank Hansen wrote:
Hi,

I am trying to write input from Kafka to a SQL server on AWS, but I have difficulties.

I get the following error could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
[error]   val dsRow = tableEnv.toAppendStream[Row](result)
[error]                                           ^

Any help is appreciated

I am not sure whether my approach is correct or not but my code is as follows:

import java.util.Properties

import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, 
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, 
TableEnvironment, Types}
import org.apache.flink.types.Row

   val properties =new Properties()
   properties.setProperty("bootstrap.servers",b_servers)
   properties.setProperty("zookeeper.connect",zk)
   properties.setProperty("group.id <http://group.id>", "very_small_test")
   properties.setProperty("ssl.endpoint.identification.algorithm ", "")
   properties.setProperty("security.protocol", "SSL")


   val kafkaSource: FlinkKafkaConsumerBase[String] =new 
FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), 
properties).setStartFromTimestamp(0)

   val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
   val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)

val schema =new Schema()
     .field("fullVisitorId",Types.STRING)
     .field("eventTime",Types.STRING)
     .field("eventID",Types.STRING)
     .field("eventType",Types.STRING)
     .field("page",Types.MAP( Types.STRING, Types.STRING))
     .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))


   tableEnv.connect(new Kafka()
       .version("universal")
       .topic("very_small_test")
       .properties(properties)
       .startFromEarliest()
      )
     .withFormat(
     new Json()
       .failOnMissingField(false)
       .deriveSchema()
   )
     .withSchema(schema)
     .inAppendMode()
     .registerTableSource("sql_source")


val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like 
'%BT%'"

val result =tableEnv.sqlQuery(sqlStatement)

   val dsRow =tableEnv.toAppendStream[Row](result)


val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
     .setDBUrl("AWS url")
.setUsername(username)
     .setPassword(password)
.setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES (?, ?, ?)")
     .setBatchInterval(100)
     .finish()

   dsRow.writeUsingOutputFormat(jdbcOutput)

tableEnv.execute("SQL test")


--

*Best regards

Martin Hansen*


Reply via email to