turcsanyip commented on code in PR #7489:
URL: https://github.com/apache/nifi/pull/7489#discussion_r1280992893
##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java:
##
@@ -17,50 +17,99 @@
package org.apache.nifi.processors.gcp.bigquery.proto;
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.DoubleValue;
import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int64Value;
+
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
/**
- * Util class for protocol buffer messaging
- */
+* Util class for protocol buffer messaging
+*/
public class ProtoUtils {
-public static DynamicMessage createMessage(Descriptors.Descriptor
descriptor, Map valueMap) {
-DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+ public static DynamicMessage createMessage(Descriptors.Descriptor
descriptor, Map valueMap, TableSchema tableSchema) {
+ final DynamicMessage.Builder builder =
DynamicMessage.newBuilder(descriptor);
+
+ for (final Descriptors.FieldDescriptor field : descriptor.getFields()) {
+ final String name = field.getName();
+ Object value = valueMap.get(name);
+ if (value == null) {
+ continue;
+ }
+
+ switch (field.getType()) {
+ case MESSAGE:
+ if (field.isRepeated()) {
+ Collection collection = value.getClass().isArray() ?
Arrays.asList((Object[]) value) : (Collection) value;
+ collection.forEach(act -> builder.addRepeatedField(field,
createMessage(field.getMessageType(), (Map) act, tableSchema)));
+ } else {
+ builder.setField(field,
createMessage(field.getMessageType(), (Map) value,
tableSchema));
+ }
+ break;
+
+ // INT64 with alias INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT
+ case INT64:
+ // Integer in the bigquery table schema maps back to INT64
which is considered to be Long on Java side:
+ // https://developers.google.com/protocol-buffers/docs/proto3
+ if (value instanceof Integer) {
+ value = Long.valueOf((Integer) value);
+ }
+
+ setField(value, field, builder);
+ break;
+
+ // FLOAT64
+ case DOUBLE:
+ if (value instanceof Float) {
+ value = ((Float) value).doubleValue();
+ }
+ setField(value, field, builder);
+ break;
+
+ // matches NUMERIC and BIGNUMERIC types in BigQuery
+ // BQTableSchemaToProtoDescriptor.class
+ case BYTES:
+ if(value instanceof BigDecimal) {
+
if(tableSchema.getFields(field.getIndex()).getType().equals(Type.BIGNUMERIC)) {
Review Comment:
Minor formatting:
```suggestion
if (value instanceof BigDecimal) {
if
(tableSchema.getFields(field.getIndex()).getType().equals(Type.BIGNUMERIC)) {
```
##
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java:
##
@@ -17,50 +17,99 @@
package org.apache.nifi.processors.gcp.bigquery.proto;
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors;
+import com.google.protobuf.DoubleValue;
import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int64Value;
+
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
/**
- * Util class for protocol buffer messaging
- */
+* Util class for protocol buffer messaging
+*/
public class ProtoUtils {
-public static DynamicMessage createMessage(Descriptors.Descriptor
descriptor, Map valueMap) {
-DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+ public static DynamicMessage createMessage(Descriptors.Descriptor
descriptor, Map valueMap, TableSchema tableSchema) {
+ final DynamicMessage.Builder builder =
DynamicMessage.newBuilder(descriptor);
+
+ for (final Descriptors.FieldDescriptor field : descriptor.getFields()) {
+ final String name = field.getName();
+ Object value = valueMap.get(name);
+ if (value == null) {
+ continue;
+ }
+
+