joshuagrisham opened a new pull request #9470: URL: https://github.com/apache/kafka/pull/9470
I have added support for the `Cast` and `ReplaceField` transformations to recursively traverse the structure of messages (both with and without a schema) and perform the Cast or Replace operations on matching child primitive fields if they are found at any level nested within the structure. Nested parents of all currently-supported Connect complex types should be supported (Map, Array, or Struct) but most of the primitive field handling is still similar to before (the lowest level within the nesting would normally be a Struct when using Schema or Map when schemaless). This behavior can be controlled by a new configuration parameter called `recursive` for both transformations. The default setting is **false** so any existing connectors would not be impacted -- you must set the parameter to **true** in order for the child complex types to be traversed. Otherwise, the default behavior should be the same as before. I have also cleaned up the Config names a bit to match some of the other transformations (namely, using a class interface called `ConfigName` which can be accessed statically), it is a bit nicer to work with and brings a little more consistency across the different built-in transforms. Since this is more than just a trivial change, I have created a few new unit tests, some of them quite complex, to try and make sure that everything is working ok compared to before. I have also been running both of these as custom SMTs against data in our production kafka cluster and used some learnings there to iron out a few issues. I have had tens of millions of events actually using this updated code so I think/hope the changes are pretty well-tested but welcome if there is some kind of feedback or concern with them! Here is a full list of everything that I have changed: ### Cast #### Added new Config Parameters: - `recursive` optional boolean, default = `false` - `complex.string.as.json` optional boolean, default = `false` #### Changes: - Created new public static interface `ConfigName` and marked the old public static string `SPEC_CONFIG` as deprecated (so it can still be used for a while until it is removed later). - Added new `ConfigName` for `SPEC`, `RECURSIVE`, and `COMPLEX_STRING_AS_JSON` - Created new interface `ConfigDefault` to provide an easy way to set and consume default values for these two new optional configuration parameters. - And then of course defined and set up the config parameters for usage within the rest of the class. - Added ARRAY, MAP, and STRUCT as valid types to convert FROM (but the only thing they can convert TO is STRING, otherwise if your `SPEC` tries to convert them to something else it will throw a `DataException`). Before you would receive a `DataException` if you tried to include a complex type in the `SPEC` but now at least you are given the option to Cast them to a string! - Refactored the `apply...` methods so that they will call a child method that recursively builds the structure of the schema or value depending if the user sets the `recursive` config parameter to true (otherwise it should basically work the same as before -- look at everything that is at the top level of the structure and then return). There are new methods for recursively handling different type of structures but in the end the same basic flow happens at each child level as what was happening before (`for each field in fields...`). - However, one change in the design is that when building the new Value, instead of looping through each field from the new schema and performing an `oldstruct.get(oldfield)`, it instead will loop again through each field in the old schema. This is so that nested primitive conversions happen correctly and what happens in the Value should be exactly the same thing that happened in the Schema. - When a child schema is created as part of the recursion, it is also added to the `schemaUpdateCache`, and the recursive methods call `getOrBuildUpdatedSchema` so they should fetch it from the cast in case it has already been converted. This is the same when child values are converted -- they should fetch their new child schemas from the cache instead of building them again. - Added usage of an instance of `JsonConverter` and `JsonDeserializer` if you wish to have the complex types toString come out in a JSON text format instead of the Java object's toString() implementation. This is helpful for scenarios for example like using a JdbcSinkConnector where you have an array of structs as one field in your source, you can put these values into a database table as a blob of JSON text, then parse these values as JSON (for example PostgreSQL has loads of really good JSON parsing). This behavior is controlled by the new configuration parameter `complex.string.as.json` but again by default it is set to false (so you have to set to true in order to use this). - I also changed `castValueToType` from `private static` to `private`. This was so that we can allow a check in this method on the instance value of our new `complex.string.as.json` configuration parameter and decide to call `castToJsonString` instead of the old `castToString` method. - I realize this one can maybe be a bit controversial, and there were several options I considered. First, however, I tried to examine any public static method or property of any kind and see if anything ever makes use of `castValueToType` and could not see any (please mention if you see something I missed). So a change from private static to private in this case seems very minor and should not impact any functionality -- nothing else can see or use it anyway outside of this class itself. - Another option is possibly a bit more like the `TimestampConverter` transform, where they have created a private instance of a `Config` subclass and then pass the entire instance to some of the static methods. But I think that change would have to "touch" a lot more places, and given the point above (I did not see where it was actually used anywhere from anything public static ) this seemed like a bit too much of a rebuild. - Also usage of this `castToJsonString` with using the `JsonConverter` and `JsonDeserializer` could be done a few other ways... but this was the one that I felt tried to use existing functionalities within Connect and was the "cleanest" looking that I could initially come up with from a code perspective. And the reason to use a single private instance of the classes was to try and save a bit on resources -- it is possible that you will have multiple Json string conversions even within one record so I thought it was better to only have one instance we can re-use over and over. - Changed the log level for the Cast log message from Trace to Debug (this seems like something you definitely want to see in Debug and not just Trace!). - Added new test cases for casting Arrays and Maps to string, to JSON Strings, as well as for working with different types of recursive records (both with and without schema). In these tests, I found that a lot of the weird issues only happen once you go a few levels deep so I have made the tests a little more complicated to ensure that everything was working properly even with multiple nested levels. #### Example Usage ``` curl -X PUT -H "Content-Type: application/json" --data '{ "connector.class": "FileStreamSink", "topics": "test", "file": "/tmp/cast.txt", "transforms": "cast", "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.cast.spec": "child_int:string,child_array_of_structs:string", "transforms.cast.recursive": "true", "transforms.cast.complex.string.as.json": "true" }' http://localhost:8083/connectors/filestreamsink_cast_test/config ``` ### ReplaceField #### Added new Config Parameters: - `recursive` optional boolean, default = `false` #### Changes: - A lot of the changes here are very similar to what was done in `Cast`. In fact, even before my changes, the flow of these two transforms was actually almost identical, so it worked quite well to do them both like this at the same time and they are easily used together in transform chains within Connect. - Namely, there is a new `recursive` configuration parameter and the `apply...` methods have been updated to recursively call methods to build the new target schema and values. A lot of the code is exactly the same so you should even find direct copy-paste from `Cast` to here and using a lot of the same private object names, methods, etc. - Note also that the pattern follows the same as with `Cast` -- looping through the old schema and converted child values based on the old structure instead of first creating a new schema and then getting the old field. This new flow also means that the `reverseRenamed` method and its `reverseRenames` map are no longer needed either (but I left them in for now). - Added a logger instance and added Debug-level logging for a few different events, such as when a field is excluded or included, or when it is renamed. So some of the methods were refactored a bit in order to provide this logging (for example the `filter` and `renamed` methods). - I also added support when using `renames` for a "contains exactly one of" kind of scenario. What I mean by this is that in your schema, you have several fields which you know by design that only one of them will have a value, and when that one has a value, all of the rest within that group will be null. The change to this transform now allows you to specify a target name more than one time, but when the value transform is occurring, if more than one of them have a value then it will throw a `DataException`. Before, the transform would throw a `DataException` when building the new schema based on the `renames` config (trying to add a duplicate field to the schema) but instead now this check happens when building the updated value instead. - Also added a few unit tests to handle new scenarios and recursive operation (both with and without schemas). #### Example Usage ``` curl -X PUT -H "Content-Type: application/json" --data '{ "connector.class": "FileStreamSink", "topics": "test", "file": "/tmp/replace.txt", "transforms": "replace", "transforms.replace.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.replace.renames": "child_value_one:child_value,child_value_two:child_value", "transforms.replace.recursive": "true", }' http://localhost:8083/connectors/filestreamsink_replace_test/config ``` I hope this covers everything but if you have any questions or concerns then please feel free to ask! ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org