joshuagrisham opened a new pull request #9470:
URL: https://github.com/apache/kafka/pull/9470


   I have added support for the `Cast` and `ReplaceField` transformations to 
recursively traverse the structure of messages (both with and without a schema) 
and perform the Cast or Replace operations on matching child primitive fields 
if they are found at any level nested within the structure.  Nested parents of 
all currently-supported Connect complex types should be supported (Map, Array, 
or Struct) but most of the primitive field handling is still similar to before 
(the lowest level within the nesting would normally be a Struct when using 
Schema or Map when schemaless).
   
   This behavior can be controlled by a new configuration parameter called 
`recursive` for both transformations.  The default setting is **false** so any 
existing connectors would not be impacted -- you must set the parameter to 
**true** in order for the child complex types to be traversed.  Otherwise, the 
default behavior should be the same as before.
   
   I have also cleaned up the Config names a bit to match some of the other 
transformations (namely, using a class interface called `ConfigName` which can 
be accessed statically), it is a bit nicer to work with and brings a little 
more consistency across the different built-in transforms.
   
   Since this is more than just a trivial change, I have created a few new unit 
tests, some of them quite complex, to try and make sure that everything is 
working ok compared to before.  I have also been running both of these as 
custom SMTs against data in our production kafka cluster and used some 
learnings there to iron out a few issues.  I have had tens of millions of 
events actually using this updated code so I think/hope the changes are pretty 
well-tested but welcome if there is some kind of feedback or concern with them!
   
   Here is a full list of everything that I have changed:
   
   ### Cast
   
   #### Added new Config Parameters:
   - `recursive` optional boolean, default = `false`
   - `complex.string.as.json` optional boolean, default = `false`
   
   #### Changes:
   
   - Created new public static interface `ConfigName` and marked the old public 
static string `SPEC_CONFIG` as deprecated (so it can still be used for a while 
until it is removed later).
   - Added new `ConfigName` for `SPEC`, `RECURSIVE`, and 
`COMPLEX_STRING_AS_JSON`
   - Created new interface `ConfigDefault` to provide an easy way to set and 
consume default values for these two new optional configuration parameters.
   - And then of course defined and set up the config parameters for usage 
within the rest of the class.
   - Added ARRAY, MAP, and STRUCT as valid types to convert FROM (but the only 
thing they can convert TO is STRING, otherwise if your `SPEC` tries to convert 
them to something else it will throw a `DataException`).  Before you would 
receive a `DataException` if you tried to include a complex type in the `SPEC` 
but now at least you are given the option to Cast them to a string!
   - Refactored the `apply...` methods so that they will call a child method 
that recursively builds the structure of the schema or value depending if the 
user sets the `recursive` config parameter to true (otherwise it should 
basically work the same as before -- look at everything that is at the top 
level of the structure and then return).  There are new methods for recursively 
handling different type of structures but in the end the same basic flow 
happens at each child level as what was happening before (`for each field in 
fields...`). 
      - However, one change in the design is that when building the new Value, 
instead of looping through each field from the new schema and performing an 
`oldstruct.get(oldfield)`, it instead will loop again through each field in the 
old schema.  This is so that nested primitive conversions happen correctly and 
what happens in the Value should be exactly the same thing that happened in the 
Schema.
   - When a child schema is created as part of the recursion, it is also added 
to the `schemaUpdateCache`, and the recursive methods call 
`getOrBuildUpdatedSchema` so they should fetch it from the cast in case it has 
already been converted.  This is the same when child values are converted -- 
they should fetch their new child schemas from the cache instead of building 
them again.
   - Added usage of an instance of `JsonConverter` and `JsonDeserializer` if 
you wish to have the complex types toString come out in a JSON text format 
instead of the Java object's toString() implementation.  This is helpful for 
scenarios for example like using a JdbcSinkConnector where you have an array of 
structs as one field in your source, you can put these values into a database 
table as a blob of JSON text, then parse these values as JSON (for example 
PostgreSQL has loads of really good JSON parsing).  This behavior is controlled 
by the new configuration parameter `complex.string.as.json` but again by 
default it is set to false (so you have to set to true in order to use this).
   - I also changed `castValueToType` from `private static` to `private`.  This 
was so that we can allow a check in this method on the instance value of our 
new `complex.string.as.json` configuration parameter and decide to call 
`castToJsonString` instead of the old `castToString` method.
     - I realize this one can maybe be a bit controversial, and there were 
several options I considered.  First, however, I tried to examine any public 
static method or property of any kind and see if anything ever makes use of 
`castValueToType` and could not see any (please mention if you see something I 
missed).  So a change from private static to private in this case seems very 
minor and should not impact any functionality -- nothing else can see or use it 
anyway outside of this class itself.
     - Another option is possibly a bit more like the `TimestampConverter` 
transform, where they have created a private instance of a `Config` subclass 
and then pass the entire instance to some of the static methods.  But I think 
that change would have to "touch"  a lot more places, and given the point above 
(I did not see where it was actually used anywhere from anything public static 
) this seemed like a bit too much of a rebuild.
     - Also usage of this `castToJsonString` with using the `JsonConverter` and 
`JsonDeserializer` could be done a few other ways... but this was the one that 
I felt tried to use existing functionalities within Connect and was the 
"cleanest" looking that I could initially come up with from a code perspective. 
 And the reason to use a single private instance of the classes was to try and 
save a bit on resources -- it is possible that you will have multiple Json 
string conversions even within one record so I thought it was better to only 
have one instance we can re-use over and over.
   - Changed the log level for the Cast log message from Trace to Debug (this 
seems like something you definitely want to see in Debug and not just Trace!).
   - Added new test cases for casting Arrays and Maps to string, to JSON 
Strings, as well as for working with different types of recursive records (both 
with and without schema).  In these tests, I found that a lot of the weird 
issues only happen once you go a few levels deep so I have made the tests a 
little more complicated to ensure that everything was working properly even 
with multiple nested levels.
   
   #### Example Usage
   
   ```
   curl -X PUT -H "Content-Type: application/json" --data '{
     "connector.class": "FileStreamSink",
     "topics": "test",
     "file": "/tmp/cast.txt",
     "transforms": "cast",
     "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
     "transforms.cast.spec": "child_int:string,child_array_of_structs:string",
     "transforms.cast.recursive": "true",
     "transforms.cast.complex.string.as.json": "true"
   }' http://localhost:8083/connectors/filestreamsink_cast_test/config
   ```
   
   
   ### ReplaceField
   
   #### Added new Config Parameters:
   - `recursive` optional boolean, default = `false`
   
   #### Changes:
   
   - A lot of the changes here are very similar to what was done in `Cast`.  In 
fact, even before my changes, the flow of these two transforms was actually 
almost identical, so it worked quite well to do them both like this at the same 
time and they are easily used together in transform chains within Connect.
   - Namely, there is a new `recursive` configuration parameter and the 
`apply...` methods have been updated to recursively call methods to build the 
new target schema and values.  A lot of the code is exactly the same so you 
should even find direct copy-paste from `Cast` to here and using a lot of the 
same private object names, methods, etc.
      - Note also that the pattern follows the same as with `Cast` -- looping 
through the old schema and converted child values based on the old structure 
instead of first creating a new schema and then getting the old field.  This 
new flow also means that the `reverseRenamed` method and its `reverseRenames` 
map are no longer needed either (but I left them in for now).
   - Added a logger instance and added Debug-level logging for a few different 
events, such as when a field is excluded or included, or when it is renamed.  
So some of the methods were refactored a bit in order to provide this logging 
(for example the `filter` and `renamed` methods).
   - I also added support when using `renames` for a "contains exactly one of" 
kind of scenario.  What I mean by this is that in your schema, you have several 
fields which you know by design that only one of them will have a value, and 
when that one has a value, all of the rest within that group will be null.  The 
change to this transform now allows you to specify a target name more than one 
time, but when the value transform is occurring, if more than one of them have 
a value then it will throw a `DataException`.  Before, the transform would 
throw a `DataException` when building the new schema based on the `renames` 
config (trying to add a duplicate field to the schema) but instead now this 
check happens when building the updated value instead.
   - Also added a few unit tests to handle new scenarios and recursive 
operation (both with and without schemas).
   
   #### Example Usage
   
   ```
   curl -X PUT -H "Content-Type: application/json" --data '{
     "connector.class": "FileStreamSink",
     "topics": "test",
     "file": "/tmp/replace.txt",
     "transforms": "replace",
     "transforms.replace.type": 
"org.apache.kafka.connect.transforms.ReplaceField$Value",
     "transforms.replace.renames": 
"child_value_one:child_value,child_value_two:child_value",
     "transforms.replace.recursive": "true",
   }' http://localhost:8083/connectors/filestreamsink_replace_test/config
   ```
   
   I hope this covers everything but if you have any questions or concerns then 
please feel free to ask!
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to