换个第三方工具看看 https://github.com/blynkkk/clickhouse4j

<dependency>
    <groupId>cc.blynk.clickhouse</groupId>
    <artifactId>clickhouse4j</artifactId>
    <version>1.4.4</version>
</dependency>


DanielGu <610493...@qq.com> 于2020年12月28日周一 上午12:22写道:

> 使用了阿里的包,写入clickhouse
> 阿里云flink-connector-clickhouse写入ClickHouse
> <
> https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>
>
>
> 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
> +---------------------------------------------+
> | default_catalog.default_database.sink_table |
> +---------------------------------------------+
> |                                          -1 |
> +---------------------------------------------+
>
>
> 代码如下
> package com.daniel
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.sources._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api._
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.{
>   TableEnvironment,
>   TableSchema,
>   Types,
>   ValidationException
> }
>
> object StreamingJob {
>   def main(args: Array[String]) {
>     val SourceCsvPath =
>       "/Users/flink-sql-demo/flink-sql-source.csv"
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     env.getConfig.disableClosureCleaner
>
>     val tEnv = StreamTableEnvironment.create(env)
>
>     val csvTableSource = CsvTableSource
>       .builder()
>       .path(SourceCsvPath)
>       .ignoreFirstLine()
>       .fieldDelimiter(",")
>       .field("name", DataTypes.STRING())
>       .field("age", DataTypes.BIGINT())
> //      .field("sex", DataTypes.STRING())
> //      .field("grade", DataTypes.INT())
>       .field("rate", DataTypes.FLOAT())
>       .build()
>
>     tEnv.registerTableSource("source", csvTableSource)
>
>     val create_sql =
>       s"""
>          | CREATE TABLE sink_table (
>          |    name VARCHAR
>          |) WITH (
>          |    'connector' = 'clickhouse',
>          |    'url' = 'clickhouse://*****:8080',
>          |    'username' = '****',
>          |    'password' = '****',
>          |    'database-name' = '***',
>          |    'table-name' = 'live.d_sink_table',
>          |    'sink.batch-size' = '1',
>          |    'sink.partition-strategy' = 'hash',
>          |    'sink.partition-key' = 'name'
>          |)
>          |""".stripMargin
>
>
>
>     tEnv.executeSql(create_sql);
>
>     val result = tEnv.executeSql(
>       "INSERT INTO sink_table SELECT name FROM source"
>     )
>     result.print()
>   }
>
> }
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复