[
https://issues.apache.org/jira/browse/KAFKA-9673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17068418#comment-17068418
]
Tom Bentley commented on KAFKA-9673:
------------------------------------
I opened
[KIP-585|https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Conditional+SMT]
for discussion.
> Conditionally apply SMTs
> ------------------------
>
> Key: KAFKA-9673
> URL: https://issues.apache.org/jira/browse/KAFKA-9673
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Reporter: Tom Bentley
> Assignee: Tom Bentley
> Priority: Major
>
> KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of
> a SMT being applied to a record lacking a given field. It's still not
> possible to apply a SMT conditionally, which is what things like Debezium
> really need in order to apply transformations only to non-schema change
> events.
> [~rhauch] suggested a mechanism to conditionally apply any SMT but was
> concerned about the possibility of a naming collision (assuming it was
> configured by a simple config)
> I'd like to propose something which would solve this problem without the
> possibility of such collisions. The idea is to have a higher-level condition,
> which applies an arbitrary transformation (or transformation chain) according
> to some predicate on the record.
> More concretely, it might be configured like this:
> {noformat}
> transforms.conditionalExtract.type: Conditional
> transforms.conditionalExtract.transforms: extractInt
> transforms.conditionalExtract.transforms.extractInt.type:
> org.apache.kafka.connect.transforms.ExtractField$Key
> transforms.conditionalExtract.transforms.extractInt.field: c1
> transforms.conditionalExtract.condition: topic-matches:<someRegexHere>
> {noformat}
> * The {{Conditional}} SMT is configured with its own list of transforms
> ({{transforms.conditionalExtract.transforms}}) to apply. This would work just
> like the top level {{transforms}} config, so subkeys can be used to configure
> these transforms in the usual way.
> * The {{condition}} config defines the predicate for when the transforms are
> applied to a record using a {{<condition-type>:<parameters>}} syntax
> We could initially support three condition types:
> *{{topic-matches:<pattern>}}* The transformation would be applied if the
> record's topic name matched the given regular expression pattern. For
> example, the following would apply the transformation on records being sent
> to any topic with a name beginning with "my-prefix-":
> {noformat}
> transforms.conditionalExtract.condition: topic-matches:my-prefix-.*
> {noformat}
>
> *{{has-header:<header-name>}}* The transformation would be applied if the
> record had at least one header with the given name. For example, the
> following will apply the transformation on records with at least one header
> with the name "my-header":
> {noformat}
> transforms.conditionalExtract.condition: has-header:my-header
> {noformat}
>
> *{{not:<condition-name>}}* This would negate the result of another named
> condition using the condition config prefix. For example, the following will
> apply the transformation on records which lack any header with the name
> my-header:
> {noformat}
> transforms.conditionalExtract.condition: not:hasMyHeader
> transforms.conditionalExtract.condition.hasMyHeader:
> has-header:my-header
> {noformat}
> I foresee one implementation concern with this approach, which is that
> currently {{Transformation}} has to return a fixed {{ConfigDef}}, and this
> proposal would require something more flexible in order to allow the config
> parameters to depend on the listed transform aliases (and similarly for named
> predicate used for the {{not:}} predicate). I think this could be done by
> adding a {{default}} method to {{Transformation}} for getting the ConfigDef
> given the config, for example.
> Obviously this would require a KIP, but before I spend any more time on this
> I'd be interested in your thoughts [~rhauch], [~rmoff], [~gunnar.morling].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)