Hi Martin,
usually, this error occurs when people forget to add
`org.apache.flink.api.scala._` to their imports. It is triggered by the
Scala macro that the DataStream API uses for extracting types.
Can you try to call `result.toAppendStream[Row]` directly? This should
work if you import `org.apache.flink.table.api.scala._`.
Maybe this example helps:
https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
Regards,
Timo
On 22.05.20 08:02, Martin Frank Hansen wrote:
Hi,
I am trying to write input from Kafka to a SQL server on AWS, but I have
difficulties.
I get the following error could not find implicit value for evidence
parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row]
[error] val dsRow = tableEnv.toAppendStream[Row](result)
[error] ^
Any help is appreciated
I am not sure whether my approach is correct or not but my code is
as follows:
import java.util.Properties
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table,
TableEnvironment, Types}
import org.apache.flink.types.Row
val properties =new Properties()
properties.setProperty("bootstrap.servers",b_servers)
properties.setProperty("zookeeper.connect",zk)
properties.setProperty("group.id <http://group.id>", "very_small_test")
properties.setProperty("ssl.endpoint.identification.algorithm ", "")
properties.setProperty("security.protocol", "SSL")
val kafkaSource: FlinkKafkaConsumerBase[String] =new
FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(),
properties).setStartFromTimestamp(0)
val settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, settings)
val schema =new Schema()
.field("fullVisitorId",Types.STRING)
.field("eventTime",Types.STRING)
.field("eventID",Types.STRING)
.field("eventType",Types.STRING)
.field("page",Types.MAP( Types.STRING, Types.STRING))
.field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING))
tableEnv.connect(new Kafka()
.version("universal")
.topic("very_small_test")
.properties(properties)
.startFromEarliest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(schema)
.inAppendMode()
.registerTableSource("sql_source")
val sqlStatement ="SELECT * from sql_source where CustomDimensions['pagePath'] like
'%BT%'"
val result =tableEnv.sqlQuery(sqlStatement)
val dsRow =tableEnv.toAppendStream[Row](result)
val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.setDBUrl("AWS url")
.setUsername(username)
.setPassword(password)
.setQuery("INSERT INTO kafka_data_test (fullVisitorId, EventTime, eventID) VALUES
(?, ?, ?)")
.setBatchInterval(100)
.finish()
dsRow.writeUsingOutputFormat(jdbcOutput)
tableEnv.execute("SQL test")
--
*Best regards
Martin Hansen*