换个第三方工具看看 https://github.com/blynkkk/clickhouse4j
<dependency>
<groupId>cc.blynk.clickhouse</groupId>
<artifactId>clickhouse4j</artifactId>
<version>1.4.4</version>
</dependency>
DanielGu <[email protected]> 于2020年12月28日周一 上午12:22写道:
> 使用了阿里的包,写入clickhouse
> 阿里云flink-connector-clickhouse写入ClickHouse
> <
> https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>
>
>
> 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
> +---------------------------------------------+
> | default_catalog.default_database.sink_table |
> +---------------------------------------------+
> | -1 |
> +---------------------------------------------+
>
>
> 代码如下
> package com.daniel
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.sources._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api._
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.{
> TableEnvironment,
> TableSchema,
> Types,
> ValidationException
> }
>
> object StreamingJob {
> def main(args: Array[String]) {
> val SourceCsvPath =
> "/Users/flink-sql-demo/flink-sql-source.csv"
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.getConfig.disableClosureCleaner
>
> val tEnv = StreamTableEnvironment.create(env)
>
> val csvTableSource = CsvTableSource
> .builder()
> .path(SourceCsvPath)
> .ignoreFirstLine()
> .fieldDelimiter(",")
> .field("name", DataTypes.STRING())
> .field("age", DataTypes.BIGINT())
> // .field("sex", DataTypes.STRING())
> // .field("grade", DataTypes.INT())
> .field("rate", DataTypes.FLOAT())
> .build()
>
> tEnv.registerTableSource("source", csvTableSource)
>
> val create_sql =
> s"""
> | CREATE TABLE sink_table (
> | name VARCHAR
> |) WITH (
> | 'connector' = 'clickhouse',
> | 'url' = 'clickhouse://*****:8080',
> | 'username' = '****',
> | 'password' = '****',
> | 'database-name' = '***',
> | 'table-name' = 'live.d_sink_table',
> | 'sink.batch-size' = '1',
> | 'sink.partition-strategy' = 'hash',
> | 'sink.partition-key' = 'name'
> |)
> |""".stripMargin
>
>
>
> tEnv.executeSql(create_sql);
>
> val result = tEnv.executeSql(
> "INSERT INTO sink_table SELECT name FROM source"
> )
> result.print()
> }
>
> }
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>