eskabetxe commented on a change in pull request #85: URL: https://github.com/apache/bahir-flink/pull/85#discussion_r597075880
########## File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Properties; + +public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { + private static final String USERNAME = "user"; +private static final String PASSWORD = "password"; Review comment: identation ########## File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Properties; + +public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { + private static final String USERNAME = "user"; +private static final String PASSWORD = "password"; + + private static final Logger log = LoggerFactory.getLogger(ClickHouseAppendSinkFunction.class); + private static final long serialVersionUID = 1L; + + private Connection connection; + private BalancedClickhouseDataSource dataSource; + private PreparedStatement pstat; + + private String address; + private String username; + private String password; + + private String prepareStatement; + private Integer batchSize; + private Long commitPadding; + + private Integer retries; + private Long retryInterval; + + private Boolean ignoreInsertError; + + private Integer currentSize; + private Long lastExecuteTime; + + public ClickHouseAppendSinkFunction(String address, String username, String password, String prepareStatement, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) { + this.address = address; + this.username = username; + this.password = password; + this.prepareStatement = prepareStatement; + this.batchSize = batchSize; + this.commitPadding = commitPadding; + this.retries = retries; + this.retryInterval = retryInterval; + this.ignoreInsertError = ignoreInsertError; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + Properties properties = new Properties(); + properties.setProperty(USERNAME, username); + properties.setProperty(PASSWORD, password); + ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties); + dataSource = new BalancedClickhouseDataSource(address, clickHouseProperties); + connection = dataSource.getConnection(); + pstat = connection.prepareStatement(prepareStatement); + lastExecuteTime = System.currentTimeMillis(); + currentSize = 0; + + } + + @Override + public void invoke(Row value, Context context) throws Exception { + for (int i = 0; i < value.getArity(); i++) { + pstat.setObject(i + 1, value.getField(i)); + } + pstat.addBatch(); + currentSize++; + if (currentSize >= batchSize || (System.currentTimeMillis() - lastExecuteTime) > commitPadding) { + try { + doExecuteRetries(retries, retryInterval); + } catch (Exception e) { + log.error("clickhouse-insert-error ( maxRetries:" + retries + " , retryInterval : " + retryInterval + " millisecond )" + e.getMessage()); + } finally { + pstat.clearBatch(); + currentSize = 0; + lastExecuteTime = System.currentTimeMillis(); + } + } + } + + public void doExecuteRetries(int count, long retryInterval) throws Exception { + + int retrySize = 0; + Exception resultException = null; + for (int i = 0; i < count; i++) { + try { + pstat.executeBatch(); + break; + } catch (Exception e) { + retrySize++; + resultException = e; + } + try { + Thread.sleep(retryInterval); + } catch (InterruptedException e) { + e.printStackTrace(); Review comment: if needed log the exception ########## File path: flink-connector-clickhouse/pom.xml ########## @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.bahir</groupId> + <artifactId>bahir-flink-parent_2.11</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId> + <name>flink-connector-clickhouse</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>ru.yandex.clickhouse</groupId> + <artifactId>clickhouse-jdbc</artifactId> + <version>0.1.50</version> Review comment: this should be in <properties> so user can change this version ########## File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunctionTest.java ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.types.Row; +import org.junit.Test; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Properties; + +public class ClickHouseAppendSinkFunctionTest { + + + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + private Connection connection; + private BalancedClickhouseDataSource dataSource; + private PreparedStatement pstat; + + @Test + public void open() throws Exception { + } + + @Test + public void invoke() throws Exception { + Properties properties = new Properties(); + properties.setProperty(USERNAME, "admin"); + properties.setProperty(PASSWORD, "admin"); + ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties); + dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://localhost:8123/default", clickHouseProperties); + connection = dataSource.getConnection(); + pstat = connection.prepareStatement(""); + Row value = new Row(2); + for (int i = 0; i < value.getArity(); i++) { + pstat.setObject(i + 1, value.getField(i)); + } + Review comment: what is the test? ########## File path: flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.table.descriptors; + +import org.junit.Test; + +import java.util.Map; + +public class ClickHouseTest { + + @Test + public void toConnectorProperties() throws Exception { + ClickHouse clickhouse = new ClickHouse() + .version("1") + .address("jdbc:clickhouse://localhost:8123/default") + .batchSize(1) + .database("qtt") + .ignoreError(false) + .padding(2000L) + .table("insert_test") + .username("admin") + .password("admin") + .retryAttempts(3) + .retryInterval(3000L); + Map<String, String> connectorProperties = clickhouse.toConnectorProperties(); + for (Map.Entry<String, String> entry : connectorProperties.entrySet()) { + System.out.println(entry.getKey() + ":" + entry.getValue()); Review comment: we should test that the map is what you expect and not print it to console ########## File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An at-least-once Table sink for ClickHouse. + * + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if + * checkpointing is enabled). However, one common use case is to run idempotent queries + * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and + * achieve exactly-once semantic.</p> + */ +public class ClickHouseTableSink implements AppendStreamTableSink<Row> { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseTableSink.class); + private static final Integer BATCH_SIZE_DEFAULT = 5000; + private static final Long COMMIT_PADDING_DEFAULT = 5000L; + private static final Integer RETRIES_DEFAULT = 3; + private static final Long RETRY_INTERVAL_DEFAULT = 3000L; + private static final Boolean IGNORE_INSERT_ERROR = false; + private String address; + private String username; + private String password; + private String database; + private String table; + private TableSchema schema; + private Integer batchSize; + private Long commitPadding; + private Integer retries; + private Long retryInterval; + private Boolean ignoreInsertError; + + + public ClickHouseTableSink(String address, String username, String password, String database, String table, TableSchema schema, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) { + this.address = address; + this.username = username; + this.password = password; + this.database = database; + this.table = table; + this.schema = schema; + this.batchSize = batchSize; + this.commitPadding = commitPadding; + this.retries = retries; + this.retryInterval = retryInterval; + this.ignoreInsertError = ignoreInsertError; + + } + + /** + * @param dataStream + * @deprecated + */ + @Override + public void emitDataStream(DataStream<Row> dataStream) { + consumeDataStream(dataStream); + } + + /** + * + * @return ClickHouseAppendSinkFunction + */ + private ClickHouseAppendSinkFunction initSink() { + String prepareStatement = createPrepareStatement(schema, database, table); + return new ClickHouseAppendSinkFunction(address, username, password, prepareStatement, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } + + @Override + public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) { + + ClickHouseTableSink copy; + try { + copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return copy; + } + + @Override + public String[] getFieldNames() { + return schema.getFieldNames(); + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return schema.getFieldTypes(); + } + + @Override + public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { + return dataStream.addSink(initSink()) + .setParallelism(dataStream.getParallelism()) + .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames())); + } + + @Override + public DataType getConsumedDataType() { + return schema.toRowDataType(); + } + + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()); + } + + + @Override + public TableSchema getTableSchema() { + return schema; + } + + + public static Builder builder() { + return new Builder(); + } + + + public String createPrepareStatement(TableSchema tableSchema, String database, String table) { + String[] fieldNames = tableSchema.getFieldNames(); + String columns = String.join(",", fieldNames); + String[] questionMark = new String[fieldNames.length]; + for (int i = 0; i < questionMark.length; i++) { Review comment: this could be change to a stream map no? String questionMark = Arrays.stream(fieldNames) .map(field -> "?") .reduce((left,right) -> left+","+right) .get(); ########## File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An at-least-once Table sink for ClickHouse. + * + * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if + * checkpointing is enabled). However, one common use case is to run idempotent queries + * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and + * achieve exactly-once semantic.</p> + */ +public class ClickHouseTableSink implements AppendStreamTableSink<Row> { + + private static final Logger log = LoggerFactory.getLogger(ClickHouseTableSink.class); + private static final Integer BATCH_SIZE_DEFAULT = 5000; + private static final Long COMMIT_PADDING_DEFAULT = 5000L; + private static final Integer RETRIES_DEFAULT = 3; + private static final Long RETRY_INTERVAL_DEFAULT = 3000L; + private static final Boolean IGNORE_INSERT_ERROR = false; + private String address; + private String username; + private String password; + private String database; + private String table; + private TableSchema schema; + private Integer batchSize; + private Long commitPadding; + private Integer retries; + private Long retryInterval; + private Boolean ignoreInsertError; + + + public ClickHouseTableSink(String address, String username, String password, String database, String table, TableSchema schema, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) { + this.address = address; + this.username = username; + this.password = password; + this.database = database; + this.table = table; + this.schema = schema; + this.batchSize = batchSize; + this.commitPadding = commitPadding; + this.retries = retries; + this.retryInterval = retryInterval; + this.ignoreInsertError = ignoreInsertError; + + } + + /** + * @param dataStream + * @deprecated + */ + @Override + public void emitDataStream(DataStream<Row> dataStream) { + consumeDataStream(dataStream); + } + + /** + * + * @return ClickHouseAppendSinkFunction + */ + private ClickHouseAppendSinkFunction initSink() { + String prepareStatement = createPrepareStatement(schema, database, table); + return new ClickHouseAppendSinkFunction(address, username, password, prepareStatement, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } + + @Override + public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) { + + ClickHouseTableSink copy; + try { + copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); + } catch (Exception e) { + throw new RuntimeException(e); Review comment: add a error message ########## File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunctionTest.java ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.types.Row; +import org.junit.Test; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.settings.ClickHouseProperties; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Properties; + +public class ClickHouseAppendSinkFunctionTest { + + + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + private Connection connection; + private BalancedClickhouseDataSource dataSource; + private PreparedStatement pstat; + + @Test + public void open() throws Exception { Review comment: empty test ########## File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apache.flink.streaming.connectors.clickhouse; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.util.Map; + +import static com.apache.flink.table.descriptors.ClickHouseValidator.*; + +public class ClickHouseTableSourceSinkFactoryTest { + + + @Test + public void createStreamTableSink() throws Exception { + Review comment: you are testing with local clickhouse? we should have a testcontainer that create a clickhouse instance to test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
