[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676614549



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   +1 for implementing that in following PRs, let's make this key generator 
simple, it is suitable for most of the cases. The raw key generator class is 
too hard to use for users, maybe we can simplify them with simpler config 
options.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676612350



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   The `HoodieTableSink` can ensure that the operation is definitely 
bulk_insert.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676560948



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   The SQL user does not need to care about what key generator they use, 
just should always use the primary key syntax instead, the `KeyGenerator` is a 
developer interface, IMO.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676559586



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   User can choose whatever they like, we do not need to put restrictions 
on the code, i think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676385780



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+super(rowType);
+this.hadoopConf = new Configuration(conf);
+this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+HashMap extraMetaData = new HashMap<>();

Review comment:
   `extraMetaData` looks fine to me, i have no strong preference for this 
local variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676388084



##
File path: hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Key generator for {@link RowData}.
+ */
+public class RowDataKeyGen {
+
+  // reference: NonpartitionedAvroKeyGenerator
+  private static final String EMPTY_PARTITION = "";
+
+  // reference: org.apache.hudi.keygen.KeyGenUtils
+  private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+  private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+  private static final String DEFAULT_PARTITION_PATH = "default";
+  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+
+  private final String[] recordKeyFields;
+  private final String[] partitionPathFields;
+
+  private final RowDataProjection recordKeyProjection;
+  private final RowDataProjection partitionPathProjection;
+
+  private final boolean hiveStylePartitioning;
+  private final boolean encodePartitionPath;
+
+  // efficient code path
+  private boolean simpleRecordKey = false;
+  private RowData.FieldGetter recordKeyFieldGetter;
+
+  private boolean simplePartitionPath = false;
+  private RowData.FieldGetter partitionPathFieldGetter;
+
+  private boolean nonPartitioned;
+
+  private RowDataKeyGen(

Review comment:
   I don't think `BuiltinKeyGenerator` is a good example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676386465



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
##
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param  Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+extends ProcessFunction {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+  /**
+   * Helper class for bulk insert mode.
+   */
+  private transient BulkInsertWriterHelper writerHelper;
+
+  /**
+   * Config options.
+   */
+  private final Configuration config;
+
+  /**
+   * Table row type.
+   */
+  private final RowType rowType;
+
+  /**
+   * Id of current subtask.
+   */
+  private int taskID;
+
+  /**
+   * Write Client.
+   */
+  private transient HoodieFlinkWriteClient writeClient;
+
+  /**
+   * The initial inflight instant when start up.
+   */
+  private volatile String initInstant;
+
+  /**
+   * Gateway to send operator events to the operator coordinator.
+   */
+  private transient OperatorEventGateway eventGateway;
+
+  /**
+   * Commit action type.
+   */
+  private transient String actionType;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+this.config = config;
+this.rowType = rowType;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws IOException {
+this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
+this.actionType = CommitUtils.getCommitActionType(
+WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

Review comment:
   MOR also works actually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676385780



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+super(rowType);
+this.hadoopConf = new Configuration(conf);
+this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+HashMap extraMetaData = new HashMap<>();

Review comment:
   `extraMetaData` looks fine to me, i have no strong preference for this 
local variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676385494



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends 
RowDataParquetWriteSupport {
+
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, 
BloomFilter bloomFilter) {
+super(rowType);
+this.hadoopConf = new Configuration(conf);
+this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+return hadoopConf;
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+HashMap extraMetaData = new HashMap<>();

Review comment:
   `extraMetaData` looks fine to me, i have no strong preference for this 
local variable name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676383783



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieRowData;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with RowData for datasource implemention of bulk insert.
+ */
+public class HoodieRowDataCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowDataCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  protected final HoodieRowDataFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  protected final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
+   String instantTime, int taskPartitionId, 
long taskId, long taskEpochId,
+   RowType rowType) {
+this.partitionPath = partitionPath;
+this.table = table;
+this.writeConfig = writeConfig;
+this.instantTime = instantTime;
+this.taskPartitionId = taskPartitionId;
+this.taskId = taskId;
+this.taskEpochId = taskEpochId;
+this.fileId = fileId;
+this.currTimer = new HoodieTimer();
+this.currTimer.startTimer();
+this.fs = table.getMetaClient().getFs();
+this.path = makeNewPath(partitionPath);
+this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+writeConfig.getWriteStatusFailureFraction());
+writeStatus.setPartitionPath(partitionPath);
+writeStatus.setFileId(fileId);
+try {
+  HoodiePartitionMetadata partitionMetadata =
+  new HoodiePartitionMetadata(
+  fs,
+  instantTime,
+  new Path(writeConfig.getBasePath()),
+  FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+  partitionMetadata.trySave(taskPartitionId);
+  createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+  this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType);
+} catch (IOException e) {
+  throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
+}
+LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link RowData} to the underlying {@link 
HoodieRowDataFileWriter}.
+   * Before writing, value for 

[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676382204



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+   String commitSeqNumber,
+   String recordKey,
+   String partitionPath,
+   String fileName,
+   RowData row) {
+this.commitTime = commitTime;
+this.commitSeqNumber = commitSeqNumber;
+this.recordKey = recordKey;
+this.partitionPath = partitionPath;
+this.fileName = fileName;
+this.row = row;
+this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+return 5 + row.getArity();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+return row.getRowKind();
+  }
+
+  @Override
+  public void setRowKind(RowKind kind) {
+this.row.setRowKind(kind);
+  }
+
+  private String getMetaColumnVal(int ordinal) {
+switch (ordinal) {
+  case 0: {
+return commitTime;
+  }
+  case 1: {
+return commitSeqNumber;
+  }
+  case 2: {
+return recordKey;
+  }
+  case 3: {
+return partitionPath;
+  }
+  case 4: {
+return fileName;
+  }
+  default:
+throw new IllegalArgumentException("Not expected");
+}
+  }
+
+  @Override
+  public boolean isNullAt(int ordinal) {
+if (ordinal < metaColumnsNum) {
+  return null == getMetaColumnVal(ordinal);
+}
+return row.isNullAt(ordinal - metaColumnsNum);
+  }
+
+  @Override
+  public boolean getBoolean(int ordinal) {
+return row.getBoolean(ordinal - metaColumnsNum);

Review comment:
   No, we can use a `JoinedRowData` instead but that code is actually not 
that efficient as this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-26 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r676381427



##
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Internal Row implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will 
intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+  private final String commitTime;
+  private final String commitSeqNumber;
+  private final String recordKey;
+  private final String partitionPath;
+  private final String fileName;
+  private final RowData row;
+  private final int metaColumnsNum;
+
+  public HoodieRowData(String commitTime,
+   String commitSeqNumber,
+   String recordKey,
+   String partitionPath,
+   String fileName,
+   RowData row) {
+this.commitTime = commitTime;
+this.commitSeqNumber = commitSeqNumber;
+this.recordKey = recordKey;
+this.partitionPath = partitionPath;
+this.fileName = fileName;
+this.row = row;
+this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+  }
+
+  @Override
+  public int getArity() {
+return 5 + row.getArity();

Review comment:
   Nice catch ~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3334: [HUDI-2209] Bulk insert for flink writer

2021-07-23 Thread GitBox


danny0405 commented on a change in pull request #3334:
URL: https://github.com/apache/hudi/pull/3334#discussion_r675907954



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Helper class for bulk insert used by Flink.
+ */
+public class BulkInsertWriterHelper {
+
+  private static final Logger LOG = 
LogManager.getLogger(BulkInsertWriterHelper.class);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig writeConfig;
+  private final RowType rowType;
+  private final Boolean arePartitionRecordsSorted;
+  private final List writeStatusList = new 
ArrayList<>();
+  private HoodieRowDataCreateHandle handle;
+  private String lastKnownPartitionPath = null;
+  private final String fileIdPrefix;
+  private int numFilesWritten = 0;
+  private final Map handles = new 
HashMap<>();
+  private final KeyGenerator keyGenerator;
+  private final Schema schema;
+  private final RowDataKeyGen keyGen;
+
+  public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
+String instantTime, int taskPartitionId, long 
taskId, long taskEpochId, RowType rowType,
+boolean arePartitionRecordsSorted) {
+this.hoodieTable = hoodieTable;
+this.writeConfig = writeConfig;
+this.instantTime = instantTime;
+this.taskPartitionId = taskPartitionId;
+this.taskId = taskId;
+this.taskEpochId = taskEpochId;
+this.rowType = addMetadataFields(rowType); // patch up with metadata fields
+this.arePartitionRecordsSorted = arePartitionRecordsSorted;
+this.fileIdPrefix = UUID.randomUUID().toString();
+try {
+  this.keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(new 
TypedProperties(writeConfig.getProps()));
+} catch (IOException e) {
+  throw new HoodieException("Create key generator error", e);
+}
+this.schema = getWriteSchema(writeConfig);
+this.keyGen = RowDataKeyGen.instance(conf, rowType);
+  }
+
+  /**
+   * Returns the write instant time.
+   */
+  public String getInstantTime() {
+return this.instantTime;
+  }
+
+  /**
+   * Get the schema of the actual write.
+   */
+  private static Schema getWriteSchema(HoodieWriteConfig config) {
+return new Schema.Parser().parse(config.getWriteSchema());
+  }
+
+  public void write(RowData record) throws IOException {
+try {
+  String recordKey = keyGen.getRecordKey(record);
+  String partitionPath = keyGen.getPartitionPath(record);
+
+  if ((lastKnownPartitionPath == null) || 
!lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
+LOG.info("Creating new file for partition path " + partitionPath);

Review comment:
   No, the input data should sort by partition path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to