Hi Flavio, Thanks for your reply. I will try another way then.
Den fre. 22. maj 2020 kl. 11.31 skrev Flavio Pompermaier < pomperma...@okkam.it>: > I expect you to see some exception somewhere, that sql server dialect is > not supported yet. > > On Fri, May 22, 2020 at 11:29 AM Martin Frank Hansen < > m...@berlingskemedia.dk> wrote: > >> Hi Flavio, >> >> Thank you so much! Thought i had that import but misread it. >> >> The code does not give any errors, but no data is written to the sql >> server. Can you see why that is? >> >> >> >> Den fre. 22. maj 2020 kl. 09.02 skrev Flavio Pompermaier < >> pomperma...@okkam.it>: >> >>> SQL server should not be supported from what I know..for this I opened a >>> PR[1] that I should rebase. >>> If someone is interested in I could do it >>> >>> [1] https://github.com/apache/flink/pull/12038 (FLINK-14101 ) >>> >>> On Fri, May 22, 2020 at 8:35 AM Timo Walther <twal...@apache.org> wrote: >>> >>>> Hi Martin, >>>> >>>> usually, this error occurs when people forget to add >>>> `org.apache.flink.api.scala._` to their imports. It is triggered by the >>>> Scala macro that the DataStream API uses for extracting types. >>>> >>>> Can you try to call `result.toAppendStream[Row]` directly? This should >>>> work if you import `org.apache.flink.table.api.scala._`. >>>> >>>> Maybe this example helps: >>>> >>>> >>>> https://github.com/apache/flink/blob/release-1.10/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala >>>> >>>> Regards, >>>> Timo >>>> >>>> >>>> On 22.05.20 08:02, Martin Frank Hansen wrote: >>>> > Hi, >>>> > >>>> > I am trying to write input from Kafka to a SQL server on AWS, but I >>>> have >>>> > difficulties. >>>> > >>>> > I get the following error could not find implicit value for evidence >>>> > parameter of type >>>> > >>>> org.apache.flink.api.common.typeinfo.TypeInformation[org.apache.flink.types.Row] >>>> > [error] val dsRow = tableEnv.toAppendStream[Row](result) >>>> > [error] ^ >>>> > >>>> > Any help is appreciated >>>> > >>>> > I am not sure whether my approach is correct or not but my code is >>>> > as follows: >>>> > >>>> > import java.util.Properties >>>> > >>>> > import org.apache.flink.table.descriptors.{Json, Kafka, Schema} >>>> > import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, >>>> JDBCOutputFormat} >>>> > import >>>> org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, >>>> FlinkKafkaConsumerBase, FlinkKafkaProducer, KafkaSerializationSchema} >>>> > import org.apache.flink.streaming.api.scala._ >>>> > import org.apache.flink.api.scala._ >>>> > import >>>> org.apache.flink.api.common.serialization.{SimpleStringEncoder, >>>> SimpleStringSchema} >>>> > import org.apache.flink.table.api.scala.StreamTableEnvironment >>>> > import org.apache.flink.table.api.{EnvironmentSettings, Table, >>>> TableEnvironment, Types} >>>> > import org.apache.flink.types.Row >>>> > >>>> > val properties =new Properties() >>>> > properties.setProperty("bootstrap.servers",b_servers) >>>> > properties.setProperty("zookeeper.connect",zk) >>>> > properties.setProperty("group.id <http://group.id>", >>>> "very_small_test") >>>> > properties.setProperty("ssl.endpoint.identification.algorithm ", >>>> "") >>>> > properties.setProperty("security.protocol", "SSL") >>>> > >>>> > >>>> > val kafkaSource: FlinkKafkaConsumerBase[String] =new >>>> FlinkKafkaConsumer[String]("very_small_test", new SimpleStringSchema(), >>>> properties).setStartFromTimestamp(0) >>>> > >>>> > val settings = >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> > val tableEnv = StreamTableEnvironment.create(env, settings) >>>> > >>>> > val schema =new Schema() >>>> > .field("fullVisitorId",Types.STRING) >>>> > .field("eventTime",Types.STRING) >>>> > .field("eventID",Types.STRING) >>>> > .field("eventType",Types.STRING) >>>> > .field("page",Types.MAP( Types.STRING, Types.STRING)) >>>> > .field("CustomDimensions",Types.MAP( Types.STRING, Types.STRING)) >>>> > >>>> > >>>> > tableEnv.connect(new Kafka() >>>> > .version("universal") >>>> > .topic("very_small_test") >>>> > .properties(properties) >>>> > .startFromEarliest() >>>> > ) >>>> > .withFormat( >>>> > new Json() >>>> > .failOnMissingField(false) >>>> > .deriveSchema() >>>> > ) >>>> > .withSchema(schema) >>>> > .inAppendMode() >>>> > .registerTableSource("sql_source") >>>> > >>>> > >>>> > val sqlStatement ="SELECT * from sql_source where >>>> CustomDimensions['pagePath'] like '%BT%'" >>>> > >>>> > val result =tableEnv.sqlQuery(sqlStatement) >>>> > >>>> > val dsRow =tableEnv.toAppendStream[Row](result) >>>> > >>>> > >>>> > val jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() >>>> > .setDrivername("com.microsoft.sqlserver.jdbc.SQLServerDriver") >>>> > .setDBUrl("AWS url") >>>> > .setUsername(username) >>>> > .setPassword(password) >>>> > .setQuery("INSERT INTO kafka_data_test (fullVisitorId, >>>> EventTime, eventID) VALUES >>>> > (?, ?, ?)") >>>> > .setBatchInterval(100) >>>> > .finish() >>>> > >>>> > dsRow.writeUsingOutputFormat(jdbcOutput) >>>> > >>>> > tableEnv.execute("SQL test") >>>> > >>>> > >>>> > -- >>>> > >>>> > *Best regards >>>> > >>>> > Martin Hansen* >>>> > >>>> >>> >> >> -- >> >> Best regards >> >> Martin Hansen >> >> >> -- Best regards Martin Hansen