C0urante commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1629805844


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +204,225 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Access {@code Map} fields and apply functions to update field values.
+     *
+     * @param originalValue schema-based data value
+     * @param whenFound     function to apply when path is found
+     * @param whenNotFound  function to apply when path is not found
+     * @param whenOther     function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Map<String, Object> updateValueFrom(
+        Map<String, Object> originalValue,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, Object> updateValue(
+        Map<String, Object> originalValue,
+        int step,
+        MapValueUpdater whenFound,
+        MapValueUpdater whenNotFound,
+        MapValueUpdater whenOther
+    ) {
+        if (originalValue == null) return null;
+        Map<String, Object> updatedParent = new 
HashMap<>(originalValue.size());
+        boolean found = false;
+        for (Map.Entry<String, Object> entry : originalValue.entrySet()) {
+            String fieldName = entry.getKey();
+            Object fieldValue = entry.getValue();
+            if (steps.get(step).equals(fieldName)) {
+                found = true;
+                if (step < lastStepIndex()) {
+                    if (fieldValue instanceof Map) {
+                        Map<String, Object> updatedField = updateValue(
+                            (Map<String, Object>) fieldValue,
+                            step + 1,
+                            whenFound,
+                            whenNotFound,
+                            whenOther);
+                        updatedParent.put(fieldName, updatedField);
+                    } else {
+                        // add back to not found and apply others, as only 
leaf values are updated
+                        found = false;
+                        whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+                    }
+                } else {
+                    whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+                }
+            } else {
+                whenOther.apply(originalValue, updatedParent, null, fieldName);
+            }
+        }
+
+        if (!found) {
+            whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+        }
+
+        return updatedParent;
+    }
+
+    /**
+     * Access {@code Struct} fields and apply functions to update field values.
+     *
+     * @param originalSchema original struct schema
+     * @param originalValue  schema-based data value
+     * @param updatedSchema  updated struct schema
+     * @param whenFound      function to apply when path is found
+     * @param whenNotFound   function to apply when path is not found
+     * @param whenOther      function to apply on fields not matched by path
+     * @return updated data value
+     */
+    public Struct updateValueFrom(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updatedSchema,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    private Struct updateValue(
+        Schema originalSchema,
+        Struct originalValue,
+        Schema updateSchema,
+        int step,
+        StructValueUpdater whenFound,
+        StructValueUpdater whenNotFound,
+        StructValueUpdater whenOther
+    ) {
+        Struct updated = new Struct(updateSchema);
+        boolean found = false;
+        for (Field field : originalSchema.fields()) {
+            if (step < steps.size()) {
+                if (steps.get(step).equals(field.name())) {
+                    found = true;
+                    if (step == lastStepIndex()) {
+                        whenFound.apply(
+                            originalValue,
+                            field,
+                            updated,
+                            updateSchema.field(field.name()),
+                            this
+                        );
+                    } else {
+                        if (field.schema().type() == Schema.Type.STRUCT) {
+                            Struct fieldValue = updateValue(
+                                field.schema(),
+                                originalValue.getStruct(field.name()),
+                                updateSchema.field(field.name()).schema(),
+                                step + 1,
+                                whenFound,
+                                whenNotFound,
+                                whenOther
+                            );
+                            updated.put(field.name(), fieldValue);
+                        } else {
+                            // add back to not found and apply others, as only 
leaf values are updated
+                            found = false;
+                            whenOther.apply(originalValue, field, updated, 
null, this);
+                        }
+                    }
+                } else {
+                    whenOther.apply(originalValue, field, updated, null, this);
+                }
+            }
+        }
+        if (!found) {
+            whenNotFound.apply(
+                originalValue,
+                null,
+                updated,
+                updateSchema.field(steps.get(step)),
+                this);
+        }
+        return updated;
+    }
+
+    /**
+     * Prepares a new schema based on an original one, and applies an update 
function
+     * when the current path(s) is found.
+     *
+     * <p>If path is not found, no function is applied, and the path is 
ignored.
+     *
+     * <p>Other fields are copied from original schema.
+     *
+     * @param originalSchema        baseline schema
+     * @param baselineSchemaBuilder baseline schema build, if changes to the 
baseline
+     *                              are required before copying original
+     * @param whenFound             function to apply when current path(s) 
is/are found.
+     * @return an updated schema. Resulting schemas are usually cached for 
further access.
+     */
+    public Schema updateSchemaFrom(
+        Schema originalSchema,
+        SchemaBuilder baselineSchemaBuilder,
+        StructSchemaUpdater whenFound,
+        StructSchemaUpdater whenNotFound,
+        StructSchemaUpdater whenOther
+    ) {
+        return updateSchema(originalSchema, baselineSchemaBuilder, 0, 
whenFound, whenNotFound, whenOther);
+    }
+
+    // Recursive implementation to update schema at different steps.
+    // Consider that resulting schemas are usually cached.
+    private Schema updateSchema(
+        Schema operatingSchema,
+        SchemaBuilder builder,
+        int step,
+        StructSchemaUpdater matching,
+        StructSchemaUpdater notFound,
+        StructSchemaUpdater others
+    ) {
+        if (operatingSchema.isOptional()) {
+            builder.optional();
+        }
+        if (operatingSchema.defaultValue() != null) {
+            builder.defaultValue(operatingSchema.defaultValue());
+        }

Review Comment:
   Hmmm... I think this is definitely required, otherwise we drop default 
values for nested structs whose children we modify.



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java:
##########
@@ -383,22 +402,18 @@ private R applyWithSchema(R record) {
             final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
             Schema updatedSchema = schemaUpdateCache.get(schema);
             if (updatedSchema == null) {
+                // cover raw schemas with default value
                 SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
-                for (Field field : schema.fields()) {
-                    if (field.name().equals(config.field)) {
-                        builder.field(field.name(), 
TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional()));
-                    } else {
-                        builder.field(field.name(), field.schema());
-                    }
-                }
-                if (schema.isOptional())
-                    builder.optional();
                 if (schema.defaultValue() != null) {
                     Struct updatedDefaultValue = applyValueWithSchema((Struct) 
schema.defaultValue(), builder);
                     builder.defaultValue(updatedDefaultValue);

Review Comment:
   Won't this line fail now? We haven't defined any fields for the new schema, 
which should render the updated default value struct invalid for it.



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java:
##########
@@ -383,22 +402,18 @@ private R applyWithSchema(R record) {
             final Struct value = requireStructOrNull(operatingValue(record), 
PURPOSE);
             Schema updatedSchema = schemaUpdateCache.get(schema);
             if (updatedSchema == null) {
+                // cover raw schemas with default value
                 SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, 
SchemaBuilder.struct());
-                for (Field field : schema.fields()) {
-                    if (field.name().equals(config.field)) {
-                        builder.field(field.name(), 
TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional()));
-                    } else {
-                        builder.field(field.name(), field.schema());
-                    }
-                }
-                if (schema.isOptional())
-                    builder.optional();
                 if (schema.defaultValue() != null) {
                     Struct updatedDefaultValue = applyValueWithSchema((Struct) 
schema.defaultValue(), builder);
                     builder.defaultValue(updatedDefaultValue);

Review Comment:
   Running the entire `TimestampConverterTest` suite with a debugger at this 
line, it looks like we don't cover it yet. We should probably add that coverage 
even if the code works as-is.



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -39,6 +44,7 @@
  * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821</a>
  * @see FieldSyntaxVersion
  */
+@InterfaceStability.Evolving

Review Comment:
   This class isn't part of public API, we shouldn't add this annotation.



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -202,6 +208,161 @@ public Object valueFrom(Map<String, Object> map) {
         return current.get(lastStep());
     }
 
+    /**
+     * Updates the matching field value if found
+     * @param map original root value
+     * @param update function to apply to existing value
+     * @return the original value with the matching field updated if found
+     */
+    public Map<String, Object> updateMap(
+        Map<String, Object> map,
+        Function<Object, Object> update
+    ) {
+        if (map == null) return null;
+        Map<String, Object> result = new HashMap<>(map);
+
+        Map<String, Object> parent = result;
+        Map<String, Object> child;
+        for (String step : stepsWithoutLast()) {
+            child = requireMapOrNull(parent.get(step), "nested field access");
+            if (child == null) return map;
+            child = new HashMap<>(child);
+            parent.put(step, child);
+            parent = child;
+        }
+
+        Object original = parent.get(lastStep());
+        Object updated = update.apply(original);
+        if (updated != null) {
+            parent.put(lastStep(), updated);
+        }

Review Comment:
   It's a little strange to invoke the update function even if `original` is 
null. Maybe something like this would be more intuitive?
   
   ```suggestion
           if (original == null)
               return map;
   
           Object updated = update.apply(original);
           if (updated != null) {
               parent.put(lastStep(), updated);
           }
   ```
   
   Or, if we _really_ want to mirror the exact current behavior of 
`TimestampConverter` (where `null` is inserted when the field isn't found), we 
could do something like this:
   ```suggestion
           Object updated = original != null ? update.apply(original) : null;
           parent.put(lastStep(), updated);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to