使用了阿里的包,写入clickhouse
阿里云flink-connector-clickhouse写入ClickHouse
<https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>
测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
+---------------------------------------------+
| default_catalog.default_database.sink_table |
+---------------------------------------------+
| -1 |
+---------------------------------------------+
代码如下
package com.daniel
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
TableEnvironment,
TableSchema,
Types,
ValidationException
}
object StreamingJob {
def main(args: Array[String]) {
val SourceCsvPath =
"/Users/flink-sql-demo/flink-sql-source.csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableClosureCleaner
val tEnv = StreamTableEnvironment.create(env)
val csvTableSource = CsvTableSource
.builder()
.path(SourceCsvPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("name", DataTypes.STRING())
.field("age", DataTypes.BIGINT())
// .field("sex", DataTypes.STRING())
// .field("grade", DataTypes.INT())
.field("rate", DataTypes.FLOAT())
.build()
tEnv.registerTableSource("source", csvTableSource)
val create_sql =
s"""
| CREATE TABLE sink_table (
| name VARCHAR
|) WITH (
| 'connector' = 'clickhouse',
| 'url' = 'clickhouse://*****:8080',
| 'username' = '****',
| 'password' = '****',
| 'database-name' = '***',
| 'table-name' = 'live.d_sink_table',
| 'sink.batch-size' = '1',
| 'sink.partition-strategy' = 'hash',
| 'sink.partition-key' = 'name'
|)
|""".stripMargin
tEnv.executeSql(create_sql);
val result = tEnv.executeSql(
"INSERT INTO sink_table SELECT name FROM source"
)
result.print()
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/