C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1583252760


##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map<String, Object>}).
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821</a>
+ * @see FieldSyntaxVersion
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final FieldSyntaxVersion version;
+    private final List<String> steps;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        this.version = version;
+        switch (version) {
+            case V1: // backward compatibility
+                this.steps = Collections.singletonList(pathText);
+                break;
+            case V2:
+                this.steps = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+        }
+    }
+
+    private static List<String> buildFieldPathV2(String path) {
+        final List<String> steps = new ArrayList<>();
+        // path character index to track backticks and dots and break path 
into steps
+        int idx = 0;
+        while (idx < path.length() && idx >= 0) {
+            if (path.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = path.indexOf(String.valueOf(DOT), idx);
+                if (idx >= 0) { // get path step and move forward
+                    String field = path.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = path.substring(start);
+                    steps.add(field);
+                }
+            } else { // has backtick
+                int backtickAt = idx;
+                idx++;
+                StringBuilder field = new StringBuilder();
+                int start = idx;
+                while (true) {
+                    // find closing backtick
+                    idx = path.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, then fail
+                        failWhenIncompleteBacktickPair(path, backtickAt);
+                    }
+
+                    // backtick escaped if right after backslash
+                    boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+                    if (idx >= path.length() - 1) { // at the end of path
+                        if (escaped) { // but escaped, then fail
+                            failWhenIncompleteBacktickPair(path, backtickAt);
+                        }
+                        field.append(path, start, idx);
+                        // we've reached the end of the path, and the last 
character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    if (path.charAt(idx + 1) != DOT) { // not followed by a dot
+                        // this backtick isn't followed by a dot; include it 
in the field name, but continue
+                        // looking for a matching backtick that is followed by 
a dot
+                        idx++;
+                        continue;
+                    }
+
+                    if (escaped) {
+                        // this backtick was escaped; include it in the field 
name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(path, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+
+                    // we've found our matching backtick
+                    field.append(path, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and 
the dot after it
+                    break;
+                }
+            }
+        }
+        // add last step if last char is a dot
+        if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
+            steps.add("");
+        return Collections.unmodifiableList(steps);
+    }
+
+    private static void failWhenIncompleteBacktickPair(String path, int 
backtickAt) {
+        throw new ConfigException("Incomplete backtick pair in path: [" + path 
+ "],"
+                + " consider adding a backslash before backtick at position " 
+ backtickAt
+                + " to escape it");
+    }
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code 
Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (schema == null) return null;
+
+        Schema current = schema;
+        for (String pathSegment : stepsWithoutLast()) {
+            final Field field = current.field(pathSegment);
+            if (field != null) {
+                current = field.schema();
+            } else {
+                return null;
+            }
+        }
+        return current.field(lastStep());
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (struct == null) return null;
+
+        Struct current = struct;
+        for (String pathSegment : stepsWithoutLast()) {
+            // Check to see if the field actually exists
+            if (current.schema().field(pathSegment) == null) {
+                return null;
+            }
+            current = current.getStruct(pathSegment);

Review Comment:
   Nit: we could have slightly friendlier error messages here:
   ```suggestion
               Object subValue = current.get(pathSegment);
               current = requireStructOrNull(subValue, "nested field access");
   ```



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map<String, Object>}).
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821</a>
+ * @see FieldSyntaxVersion
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final FieldSyntaxVersion version;
+    private final List<String> steps;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        this.version = version;
+        switch (version) {
+            case V1: // backward compatibility
+                this.steps = Collections.singletonList(pathText);
+                break;
+            case V2:
+                this.steps = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+        }
+    }
+
+    private static List<String> buildFieldPathV2(String path) {
+        final List<String> steps = new ArrayList<>();
+        // path character index to track backticks and dots and break path 
into steps
+        int idx = 0;
+        while (idx < path.length() && idx >= 0) {
+            if (path.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = path.indexOf(String.valueOf(DOT), idx);
+                if (idx >= 0) { // get path step and move forward
+                    String field = path.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = path.substring(start);
+                    steps.add(field);
+                }
+            } else { // has backtick
+                int backtickAt = idx;
+                idx++;
+                StringBuilder field = new StringBuilder();
+                int start = idx;
+                while (true) {
+                    // find closing backtick
+                    idx = path.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, then fail
+                        failWhenIncompleteBacktickPair(path, backtickAt);
+                    }
+
+                    // backtick escaped if right after backslash
+                    boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+                    if (idx >= path.length() - 1) { // at the end of path
+                        if (escaped) { // but escaped, then fail
+                            failWhenIncompleteBacktickPair(path, backtickAt);
+                        }
+                        field.append(path, start, idx);
+                        // we've reached the end of the path, and the last 
character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    if (path.charAt(idx + 1) != DOT) { // not followed by a dot
+                        // this backtick isn't followed by a dot; include it 
in the field name, but continue
+                        // looking for a matching backtick that is followed by 
a dot
+                        idx++;
+                        continue;
+                    }
+
+                    if (escaped) {
+                        // this backtick was escaped; include it in the field 
name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(path, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+
+                    // we've found our matching backtick
+                    field.append(path, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and 
the dot after it
+                    break;
+                }
+            }
+        }
+        // add last step if last char is a dot
+        if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
+            steps.add("");
+        return Collections.unmodifiableList(steps);
+    }
+
+    private static void failWhenIncompleteBacktickPair(String path, int 
backtickAt) {
+        throw new ConfigException("Incomplete backtick pair in path: [" + path 
+ "],"
+                + " consider adding a backslash before backtick at position " 
+ backtickAt
+                + " to escape it");
+    }
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code 
Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (schema == null) return null;
+
+        Schema current = schema;
+        for (String pathSegment : stepsWithoutLast()) {
+            final Field field = current.field(pathSegment);
+            if (field != null) {
+                current = field.schema();
+            } else {
+                return null;
+            }
+        }
+        return current.field(lastStep());
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (struct == null) return null;
+
+        Struct current = struct;
+        for (String pathSegment : stepsWithoutLast()) {
+            // Check to see if the field actually exists
+            if (current.schema().field(pathSegment) == null) {
+                return null;
+            }
+            current = current.getStruct(pathSegment);
+            if (current == null) return null;
+        }
+
+        if (current.schema().field(lastStep()) != null) {
+            return current.get(lastStep());
+        } else {
+            return null;
+        }
+    }
+
+    List<String> stepsWithoutLast() {

Review Comment:
   Nit: can be private
   ```suggestion
       private List<String> stepsWithoutLast() {
   ```



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+    @Test void shouldFindField() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);

Review Comment:
   (This applies in several other places in this suite; I've only left one 
comment to avoid clutter.)



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+    @Test void shouldFindField() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+        assertEquals(barSchema.field("bar"), 
pathV2("foo.bar").fieldFrom(schema));
+        assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema));
+    }
+
+    @Test void shouldReturnNullFieldWhenFieldNotFound() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+        assertNull(pathV2("un.known").fieldFrom(schema));
+        assertNull(pathV2("foo.unknown").fieldFrom(schema));
+        assertNull(pathV2("unknown").fieldFrom(schema));
+        assertNull(pathV2("test").fieldFrom(null));
+    }
+
+    @Test void shouldFindValueInMap() {
+        Map<String, Object> foo = new HashMap<>();
+        foo.put("bar", 42);
+        foo.put("baz", null);
+        Map<String, Object> map = new HashMap<>();
+        map.put("foo", foo);
+
+        assertEquals(42, pathV2("foo.bar").valueFrom(map));
+        assertNull(pathV2("foo.baz").valueFrom(map));
+    }
+
+    @Test void shouldReturnNullValueWhenFieldNotFoundInMap() {
+        Map<String, Object> foo = new HashMap<>();
+        foo.put("bar", 42);
+        foo.put("baz", null);
+        Map<String, Object> map = new HashMap<>();
+        map.put("foo", foo);
+
+        assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(map));

Review Comment:
   Don't we want to use `pathV2` (or possibly `pathV2Value`, proposed below) 
instead of manually instantiating the `SingleFieldPath`?
   
   (This applies to other places in the test suite as well.)



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java:
##########
@@ -111,7 +157,24 @@ public void nonExistentFieldWithSchemaShouldFail() {
             xform.apply(record);
             fail("Expected exception wasn't raised");
         } catch (IllegalArgumentException iae) {
-            assertEquals("Unknown field: nonexistent", iae.getMessage());
+            assertEquals("Unknown field: SingleFieldPath{version=V1, 
path=nonexistent}", iae.getMessage());
+        }
+    }
+
+    @Test
+    public void nonExistentNestedFieldWithSchemaShouldFail() {

Review Comment:
   Don't we need to configure the transform to use field syntax V2?



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java:
##########
@@ -111,7 +157,24 @@ public void nonExistentFieldWithSchemaShouldFail() {
             xform.apply(record);
             fail("Expected exception wasn't raised");
         } catch (IllegalArgumentException iae) {
-            assertEquals("Unknown field: nonexistent", iae.getMessage());
+            assertEquals("Unknown field: SingleFieldPath{version=V1, 
path=nonexistent}", iae.getMessage());

Review Comment:
   This error message is less human-readable. Could we preserve the existing 
one? I don't think the syntax version is necessary; we can just use the path as 
specified by the user in the transform config.



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+    @Test void shouldFindField() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);

Review Comment:
   Better practice if we operate on fully-built `Schema` instances instead of 
`SchemaBuilder` objects:
   ```suggestion
           Schema barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA).build();
   ```
   



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+    @Test void shouldFindField() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+        assertEquals(barSchema.field("bar"), 
pathV2("foo.bar").fieldFrom(schema));
+        assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema));
+    }
+
+    @Test void shouldReturnNullFieldWhenFieldNotFound() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+        assertNull(pathV2("un.known").fieldFrom(schema));
+        assertNull(pathV2("foo.unknown").fieldFrom(schema));
+        assertNull(pathV2("unknown").fieldFrom(schema));
+        assertNull(pathV2("test").fieldFrom(null));
+    }
+
+    @Test void shouldFindValueInMap() {
+        Map<String, Object> foo = new HashMap<>();
+        foo.put("bar", 42);
+        foo.put("baz", null);
+        Map<String, Object> map = new HashMap<>();
+        map.put("foo", foo);
+
+        assertEquals(42, pathV2("foo.bar").valueFrom(map));
+        assertNull(pathV2("foo.baz").valueFrom(map));
+    }
+
+    @Test void shouldReturnNullValueWhenFieldNotFoundInMap() {
+        Map<String, Object> foo = new HashMap<>();
+        foo.put("bar", 42);
+        foo.put("baz", null);
+        Map<String, Object> map = new HashMap<>();
+        map.put("foo", foo);
+
+        assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("foo.baz", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("foo.baz.inner", 
FieldSyntaxVersion.V2).valueFrom(map));
+    }
+
+    @Test void shouldFindValueInStruct() {
+        SchemaBuilder bazSchema = SchemaBuilder.struct()
+            .field("inner", Schema.STRING_SCHEMA);
+        SchemaBuilder barSchema = SchemaBuilder.struct()
+            .field("bar", Schema.INT32_SCHEMA)
+            .field("baz", bazSchema.optional());
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+        Struct foo = new Struct(barSchema)
+            .put("bar", 42)
+            .put("baz", null);
+        Struct struct = new Struct(schema).put("foo", foo);
+
+        assertEquals(42, pathV2("foo.bar").valueFrom(struct));
+        assertNull(pathV2("foo.baz").valueFrom(struct));
+    }
+
+    @Test void shouldReturnNullValueWhenFieldNotFoundInStruct() {
+        SchemaBuilder bazSchema = SchemaBuilder.struct()
+            .field("inner", Schema.STRING_SCHEMA);
+        SchemaBuilder barSchema = SchemaBuilder.struct()
+            .field("bar", Schema.INT32_SCHEMA)
+            .field("baz", bazSchema.optional());
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+        Struct foo = new Struct(barSchema)
+            .put("bar", 42)
+            .put("baz", null);
+        Struct struct = new Struct(schema).put("foo", foo);
+
+        assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(struct));
+        assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(struct));
+        assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(struct));
+        assertNull(new SingleFieldPath("foo.baz", 
FieldSyntaxVersion.V2).valueFrom(struct));
+        assertNull(new SingleFieldPath("foo.baz.inner", 
FieldSyntaxVersion.V2).valueFrom(struct));
+    }
+
+    private static SingleFieldPath pathV2(String path) {
+        return new SingleFieldPath(path, FieldSyntaxVersion.V2);
+    }

Review Comment:
   We can simplify things even further by piggybacking off of this with a few 
extra utility methods:
   
   ```java
       private static Field pathV2Field(String path, Schema schema) {
           return pathV2(path).fieldFrom(schema);
       }
   
       private static Object pathV2Value(String path, Struct struct) {
           return pathV2(path).valueFrom(struct);
       }
   
       private static Object pathV2Value(String path, Map<String, Object> map) {
           return pathV2(path).valueFrom(map);
       }
   ```
   
   I think we can replace all direct invocations of `pathV2` with these wrapper 
methods.



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java:
##########
@@ -111,7 +157,24 @@ public void nonExistentFieldWithSchemaShouldFail() {
             xform.apply(record);
             fail("Expected exception wasn't raised");
         } catch (IllegalArgumentException iae) {
-            assertEquals("Unknown field: nonexistent", iae.getMessage());
+            assertEquals("Unknown field: SingleFieldPath{version=V1, 
path=nonexistent}", iae.getMessage());
+        }
+    }
+
+    @Test
+    public void nonExistentNestedFieldWithSchemaShouldFail() {
+        xform.configure(Collections.singletonMap("field", 
"magic.nonexistent"));
+
+        final Schema fooSchema = SchemaBuilder.struct().field("foo", 
Schema.INT32_SCHEMA).build();
+        final Schema keySchema = SchemaBuilder.struct().field("magic", 
fooSchema).build();
+        final Struct key = new Struct(keySchema).put("magic", new 
Struct(fooSchema).put("foo", 42));
+        final SinkRecord record = new SinkRecord("test", 0, keySchema, key, 
null, null, 0);
+
+        try {
+            xform.apply(record);
+            fail("Expected exception wasn't raised");
+        } catch (IllegalArgumentException iae) {
+            assertEquals("Unknown field: SingleFieldPath{version=V1, 
path=magic.nonexistent}", iae.getMessage());

Review Comment:
   Like with `nonExistentFieldWithSchemaShouldFail`, I don't think the field 
syntax version needs to be included in the error message. But if it is 
included, shouldn't it be V2 here instead of V1?



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map<String, Object>}).
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821</a>
+ * @see FieldSyntaxVersion
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final FieldSyntaxVersion version;
+    private final List<String> steps;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        this.version = version;
+        switch (version) {
+            case V1: // backward compatibility
+                this.steps = Collections.singletonList(pathText);
+                break;
+            case V2:
+                this.steps = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+        }
+    }
+
+    private static List<String> buildFieldPathV2(String path) {
+        final List<String> steps = new ArrayList<>();
+        // path character index to track backticks and dots and break path 
into steps
+        int idx = 0;
+        while (idx < path.length() && idx >= 0) {
+            if (path.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = path.indexOf(String.valueOf(DOT), idx);
+                if (idx >= 0) { // get path step and move forward
+                    String field = path.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = path.substring(start);
+                    steps.add(field);
+                }
+            } else { // has backtick
+                int backtickAt = idx;
+                idx++;
+                StringBuilder field = new StringBuilder();
+                int start = idx;
+                while (true) {
+                    // find closing backtick
+                    idx = path.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, then fail
+                        failWhenIncompleteBacktickPair(path, backtickAt);
+                    }
+
+                    // backtick escaped if right after backslash
+                    boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+                    if (idx >= path.length() - 1) { // at the end of path
+                        if (escaped) { // but escaped, then fail
+                            failWhenIncompleteBacktickPair(path, backtickAt);
+                        }
+                        field.append(path, start, idx);
+                        // we've reached the end of the path, and the last 
character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    if (path.charAt(idx + 1) != DOT) { // not followed by a dot
+                        // this backtick isn't followed by a dot; include it 
in the field name, but continue
+                        // looking for a matching backtick that is followed by 
a dot
+                        idx++;
+                        continue;
+                    }
+
+                    if (escaped) {
+                        // this backtick was escaped; include it in the field 
name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(path, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+
+                    // we've found our matching backtick
+                    field.append(path, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and 
the dot after it
+                    break;
+                }
+            }
+        }
+        // add last step if last char is a dot
+        if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
+            steps.add("");
+        return Collections.unmodifiableList(steps);
+    }
+
+    private static void failWhenIncompleteBacktickPair(String path, int 
backtickAt) {
+        throw new ConfigException("Incomplete backtick pair in path: [" + path 
+ "],"
+                + " consider adding a backslash before backtick at position " 
+ backtickAt
+                + " to escape it");
+    }
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code 
Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (schema == null) return null;
+
+        Schema current = schema;
+        for (String pathSegment : stepsWithoutLast()) {
+            final Field field = current.field(pathSegment);
+            if (field != null) {
+                current = field.schema();
+            } else {
+                return null;
+            }
+        }
+        return current.field(lastStep());
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (struct == null) return null;
+
+        Struct current = struct;
+        for (String pathSegment : stepsWithoutLast()) {
+            // Check to see if the field actually exists
+            if (current.schema().field(pathSegment) == null) {
+                return null;
+            }
+            current = current.getStruct(pathSegment);
+            if (current == null) return null;
+        }
+
+        if (current.schema().field(lastStep()) != null) {
+            return current.get(lastStep());
+        } else {
+            return null;
+        }
+    }
+
+    List<String> stepsWithoutLast() {
+        return steps.subList(0, lastStepIndex());
+    }
+
+    /**
+     * Access a value at the current path within a schemaless {@code 
Map<String, Object>}.
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Map<String, Object> map) {
+        if (map == null) return null;
+
+        Map<String, Object> current = map;
+        for (String step : stepsWithoutLast()) {
+            current = requireMapOrNull(current.get(step), "nested field 
access");
+            if (current == null) return null;
+        }
+        return current.get(lastStep());
+    }
+
+    // For testing
+    String[] path() {
+        return steps.toArray(new String[0]);
+    }
+
+    String lastStep() {

Review Comment:
   Nit: can be private
   ```suggestion
       private String lastStep() {
   ```



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java:
##########
@@ -99,6 +133,18 @@ public void nonExistentFieldSchemalessShouldReturnNull() {
         assertNull(transformedRecord.key());
     }
 
+    @Test
+    public void nonExistentNestedFieldSchemalessShouldReturnNull() {

Review Comment:
   Don't we need to configure the transform to use field syntax V2?



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldSyntaxVersionTest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class FieldSyntaxVersionTest {
+    @Test
+    void shouldAppendConfigToDef() {
+        ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
+        assertEquals(def.configKeys().size(), 1);
+        final ConfigDef.ConfigKey configKey = 
def.configKeys().get("field.syntax.version");
+        assertEquals(configKey.name, "field.syntax.version");
+        assertEquals(configKey.defaultValue, "V1");
+    }
+
+    @Test
+    void shouldFailWhenAppendConfigToDefAgain() {
+        ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
+        assertEquals(def.configKeys().size(), 1);
+        ConfigException e = assertThrows(ConfigException.class, () -> 
FieldSyntaxVersion.appendConfigTo(def));
+        assertEquals(e.getMessage(), "Configuration field.syntax.version is 
defined twice.");
+    }
+
+    @ParameterizedTest
+    @CsvSource({"v1,V1", "v2,V2", "V1,V1", "V2,V2"})
+    void shouldGetVersionFromConfig(String input, FieldSyntaxVersion version) {

Review Comment:
   This is pretty slick. Nice!



##########
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+    @Test void shouldFindField() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+        assertEquals(barSchema.field("bar"), 
pathV2("foo.bar").fieldFrom(schema));
+        assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema));
+    }
+
+    @Test void shouldReturnNullFieldWhenFieldNotFound() {
+        SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+        Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+        assertNull(pathV2("un.known").fieldFrom(schema));
+        assertNull(pathV2("foo.unknown").fieldFrom(schema));
+        assertNull(pathV2("unknown").fieldFrom(schema));
+        assertNull(pathV2("test").fieldFrom(null));
+    }
+
+    @Test void shouldFindValueInMap() {
+        Map<String, Object> foo = new HashMap<>();
+        foo.put("bar", 42);
+        foo.put("baz", null);
+        Map<String, Object> map = new HashMap<>();
+        map.put("foo", foo);
+
+        assertEquals(42, pathV2("foo.bar").valueFrom(map));
+        assertNull(pathV2("foo.baz").valueFrom(map));
+    }
+
+    @Test void shouldReturnNullValueWhenFieldNotFoundInMap() {
+        Map<String, Object> foo = new HashMap<>();
+        foo.put("bar", 42);
+        foo.put("baz", null);
+        Map<String, Object> map = new HashMap<>();
+        map.put("foo", foo);
+
+        assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+        assertNull(new SingleFieldPath("foo.baz", 
FieldSyntaxVersion.V2).valueFrom(map));

Review Comment:
   This assertion technically doesn't belong in this case, right? This value 
does appear in the map, it just happens to be explicitly null. It's also 
covered above in `shouldFindValueInMap`, so it should be safe to remove from 
here.
   
   (This same pattern applies in other places in the test suite too.)



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map<String, Object>}).
+ *
+ * <p>The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures";>KIP-821</a>
+ * @see FieldSyntaxVersion
+ */
+public class SingleFieldPath {
+    // Invariants:
+    // - A field path can contain one or more steps
+    private static final char BACKTICK = '`';
+    private static final char DOT = '.';
+    private static final char BACKSLASH = '\\';
+
+    private final FieldSyntaxVersion version;
+    private final List<String> steps;
+
+    public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+        this.version = version;
+        switch (version) {
+            case V1: // backward compatibility
+                this.steps = Collections.singletonList(pathText);
+                break;
+            case V2:
+                this.steps = buildFieldPathV2(pathText);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+        }
+    }
+
+    private static List<String> buildFieldPathV2(String path) {
+        final List<String> steps = new ArrayList<>();
+        // path character index to track backticks and dots and break path 
into steps
+        int idx = 0;
+        while (idx < path.length() && idx >= 0) {
+            if (path.charAt(idx) != BACKTICK) {
+                final int start = idx;
+                idx = path.indexOf(String.valueOf(DOT), idx);
+                if (idx >= 0) { // get path step and move forward
+                    String field = path.substring(start, idx);
+                    steps.add(field);
+                    idx++;
+                } else { // add all
+                    String field = path.substring(start);
+                    steps.add(field);
+                }
+            } else { // has backtick
+                int backtickAt = idx;
+                idx++;
+                StringBuilder field = new StringBuilder();
+                int start = idx;
+                while (true) {
+                    // find closing backtick
+                    idx = path.indexOf(String.valueOf(BACKTICK), idx);
+                    if (idx == -1) { // if not found, then fail
+                        failWhenIncompleteBacktickPair(path, backtickAt);
+                    }
+
+                    // backtick escaped if right after backslash
+                    boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+                    if (idx >= path.length() - 1) { // at the end of path
+                        if (escaped) { // but escaped, then fail
+                            failWhenIncompleteBacktickPair(path, backtickAt);
+                        }
+                        field.append(path, start, idx);
+                        // we've reached the end of the path, and the last 
character is the backtick
+                        steps.add(field.toString());
+                        idx++;
+                        break;
+                    }
+
+                    if (path.charAt(idx + 1) != DOT) { // not followed by a dot
+                        // this backtick isn't followed by a dot; include it 
in the field name, but continue
+                        // looking for a matching backtick that is followed by 
a dot
+                        idx++;
+                        continue;
+                    }
+
+                    if (escaped) {
+                        // this backtick was escaped; include it in the field 
name, but continue
+                        // looking for an unescaped matching backtick
+                        field.append(path, start, idx - 1)
+                            .append(BACKTICK);
+
+                        idx++;
+                        start = idx;
+                        continue;
+                    }
+
+                    // we've found our matching backtick
+                    field.append(path, start, idx);
+                    steps.add(field.toString());
+                    idx += 2; // increment by two to include the backtick and 
the dot after it
+                    break;
+                }
+            }
+        }
+        // add last step if last char is a dot
+        if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
+            steps.add("");
+        return Collections.unmodifiableList(steps);
+    }
+
+    private static void failWhenIncompleteBacktickPair(String path, int 
backtickAt) {
+        throw new ConfigException("Incomplete backtick pair in path: [" + path 
+ "],"
+                + " consider adding a backslash before backtick at position " 
+ backtickAt
+                + " to escape it");
+    }
+
+    /**
+     * Access a {@code Field} at the current path within a schema {@code 
Schema}
+     * If field is not found, then {@code null} is returned.
+     */
+    public Field fieldFrom(Schema schema) {
+        if (schema == null) return null;
+
+        Schema current = schema;
+        for (String pathSegment : stepsWithoutLast()) {
+            final Field field = current.field(pathSegment);
+            if (field != null) {
+                current = field.schema();
+            } else {
+                return null;
+            }
+        }
+        return current.field(lastStep());
+    }
+
+    /**
+     * Access a value at the current path within a schema-based {@code Struct}
+     * If object is not found, then {@code null} is returned.
+     */
+    public Object valueFrom(Struct struct) {
+        if (struct == null) return null;
+
+        Struct current = struct;
+        for (String pathSegment : stepsWithoutLast()) {
+            // Check to see if the field actually exists
+            if (current.schema().field(pathSegment) == null) {
+                return null;
+            }
+            current = current.getStruct(pathSegment);
+            if (current == null) return null;
+        }
+
+        if (current.schema().field(lastStep()) != null) {
+            return current.get(lastStep());
+        } else {
+            return null;
+        }
+    }
+
+    List<String> stepsWithoutLast() {

Review Comment:
   Also nit: would be nice to move this lower to be next to the other 
private/should-be-private methods like `lastStep` and `lastStepIndex`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to