使用了阿里的包,写入clickhouse
阿里云flink-connector-clickhouse写入ClickHouse
<https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>
  

测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
+---------------------------------------------+
| default_catalog.default_database.sink_table |
+---------------------------------------------+
|                                          -1 |
+---------------------------------------------+


代码如下
package com.daniel
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
  TableEnvironment,
  TableSchema,
  Types,
  ValidationException
}

object StreamingJob {
  def main(args: Array[String]) {
    val SourceCsvPath =
      "/Users/flink-sql-demo/flink-sql-source.csv"

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.disableClosureCleaner

    val tEnv = StreamTableEnvironment.create(env)

    val csvTableSource = CsvTableSource
      .builder()
      .path(SourceCsvPath)
      .ignoreFirstLine()
      .fieldDelimiter(",")
      .field("name", DataTypes.STRING())
      .field("age", DataTypes.BIGINT())
//      .field("sex", DataTypes.STRING())
//      .field("grade", DataTypes.INT())
      .field("rate", DataTypes.FLOAT())
      .build()

    tEnv.registerTableSource("source", csvTableSource)

    val create_sql =
      s"""
         | CREATE TABLE sink_table (
         |    name VARCHAR
         |) WITH (
         |    'connector' = 'clickhouse',
         |    'url' = 'clickhouse://*****:8080',
         |    'username' = '****',
         |    'password' = '****',
         |    'database-name' = '***',
         |    'table-name' = 'live.d_sink_table',
         |    'sink.batch-size' = '1',
         |    'sink.partition-strategy' = 'hash',
         |    'sink.partition-key' = 'name'
         |)
         |""".stripMargin



    tEnv.executeSql(create_sql);

    val result = tEnv.executeSql(
      "INSERT INTO sink_table SELECT name FROM source"
    )
    result.print()
  }

}






--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复