Re: Additional Kafka Connect schema logical types?

2016-03-08 Thread Ewen Cheslack-Postava
On Fri, Mar 4, 2016 at 1:37 PM, Randall Hauch  wrote:

> I’m working on a Kafka Connect connector that reads a MySQL binlog to
> provide near real-time change data capture, and I also plan connectors for
> other DBMSes. The problem is that I’m not able to map all of the MySQL data
> types — or even all of the standard JDBC types — to Kafka Connect Schemas
> without resorting to complex Schemas that radically increase the footprint
> of messages.
>
> Specifically, I’d like my connectors to be able to use the following
> “logical” types:
>
> - Bits: A set of bits of arbitrary length, corresponding to
> java.util.BitSet. See [1] for code..
> - IsoTime: An ISO8601 time that includes the time zone and corresponding
> to Java 8’s java.time.OffsetTime that represents a time with the offset
> from UTC/Greenwich, and that has a well-defined ordering and thus is more
> suitable for persistent storage. See [2] for code..
> - IsoTimestamp: An ISO8601 timestamp that includes the time zone and
> corresponding to Java 8’s java.time.OffsetDateTime that represents an
> instant with the offset from UTC/Greenwich, and that has a well-defined
> ordering and thus is more suitable for persistent storage. See [3] for code.
>

These all makes sense and seem pretty straightforward. I'm hoping to scope
the available logical types in the framework so they cover as many cases as
possible with as few types as possible, precisely for the reason you
mention below -- the number of types can quickly explode, which is bad for
everyone.

(And a note probably better left to review if you submit these as new
logical types for the framework: for BitSet, it doesn't seem like the
encoding preserves the number of bits, i.e. it looks like it might round up
to 8. Should the number of bits be a schema parameter, or is the loss ok
and we can't guarantee fixed sizes for common use cases?)


> These are very similar to the 4 built-in logical types (Decimal, Date,
> Time, and Timestamp). These logical types are much akin to aliases for a
> primitive type (typically BYTES), and their use within a Schema includes
> semantics that would not be there by just using the corresponding primitive.
>
> Unfortunately, Kafka Connect is not currently able to support custom
> logical types. Sure, you can create them, since the JsonConverter (nor any
> of the other Converters) will know how to serialize or deserialize them.
>
> One option is for Kafka Connect to add these, but this is sort of a
> never-ending battle. And, since Kafka is not yet on Java 8, supporting
> OffsetTime and OffsetDateTime would be problematic.
>
> Perhaps a better option is to support custom logical types, where each
> logical type must be based upon a single primitive type and must define a
> class that knows how to serialize and deserialize the logical type from the
> primitive type. The Converters, once modified, could look for the
> referenced class and use its serdes logic as needed.
>

Possibly simpler would be to make them pluggable in the framework but not
require the Converters to be aware of them. This requires care in the
framework to ensure we handle to/fromLogical conversions everywhere a
conversion takes place, but avoids complicating Converters. Even better is
if we can do this in a backwards compatible way such that even if the
Converter does the logical conversions (as they currently do) that check
just becomes a nop if the conversion has already been performed.

However, not baking logical types into the framework makes it way more
complicated to use them. What's the registration process? Do we need to
pick up the classes automatically somehow? One of the nice things about
baking them into the framework is that it helps define the set of valid
Java types that can be passed to converters. With a completely pluggable
solution, Converters can't be sure what they're allowed to do/what types
they need to support.


>
> A couple of points:
>
> 1) Any source connector that is producing a record with these logical
> types would obviously have to have the logical type’s class available on
> the classpath. That doesn’t seem a difficult requirement to satisfy.
>

This seems easy, but keep in mind how this can complicate compatibility
when you have multiple connector plugins depending on the same jar for
logical types but on different versions. This is of course always a problem
for dependencies, but structuring logical types like this seems like it'll
very quickly cause us to hit these problems. At a minimum, it makes
compatibility harder to reason about.


>
> 2) Any consumer or source connector that is consuming records with these
> values needs to be able to work with the logical type’s class to be able to
> work with it. This doesn’t seem too horrible, especially if the logical
> type class(es) are nicely separated into separate JARs. However, if the
> consumer doesn’t have the logical type class, then its local Converter
> would just deserialize to the 

Additional Kafka Connect schema logical types?

2016-03-04 Thread Randall Hauch
I’m working on a Kafka Connect connector that reads a MySQL binlog to provide 
near real-time change data capture, and I also plan connectors for other 
DBMSes. The problem is that I’m not able to map all of the MySQL data types — 
or even all of the standard JDBC types — to Kafka Connect Schemas without 
resorting to complex Schemas that radically increase the footprint of messages.

Specifically, I’d like my connectors to be able to use the following “logical” 
types:

- Bits: A set of bits of arbitrary length, corresponding to java.util.BitSet. 
See [1] for code..
- IsoTime: An ISO8601 time that includes the time zone and corresponding to 
Java 8’s java.time.OffsetTime that represents a time with the offset from 
UTC/Greenwich, and that has a well-defined ordering and thus is more suitable 
for persistent storage. See [2] for code..
- IsoTimestamp: An ISO8601 timestamp that includes the time zone and 
corresponding to Java 8’s java.time.OffsetDateTime that represents an instant 
with the offset from UTC/Greenwich, and that has a well-defined ordering and 
thus is more suitable for persistent storage. See [3] for code.

These are very similar to the 4 built-in logical types (Decimal, Date, Time, 
and Timestamp). These logical types are much akin to aliases for a primitive 
type (typically BYTES), and their use within a Schema includes semantics that 
would not be there by just using the corresponding primitive.

Unfortunately, Kafka Connect is not currently able to support custom logical 
types. Sure, you can create them, since the JsonConverter (nor any of the other 
Converters) will know how to serialize or deserialize them.

One option is for Kafka Connect to add these, but this is sort of a 
never-ending battle. And, since Kafka is not yet on Java 8, supporting 
OffsetTime and OffsetDateTime would be problematic.

Perhaps a better option is to support custom logical types, where each logical 
type must be based upon a single primitive type and must define a class that 
knows how to serialize and deserialize the logical type from the primitive 
type. The Converters, once modified, could look for the referenced class and 
use its serdes logic as needed.

A couple of points:

1) Any source connector that is producing a record with these logical types 
would obviously have to have the logical type’s class available on the 
classpath. That doesn’t seem a difficult requirement to satisfy.

2) Any consumer or source connector that is consuming records with these values 
needs to be able to work with the logical type’s class to be able to work with 
it. This doesn’t seem too horrible, especially if the logical type class(es) 
are nicely separated into separate JARs. However, if the consumer doesn’t have 
the logical type class, then its local Converter would just deserialize to the 
corresponding primitive value (e.g., byte[], int, long, float, String, etc.) — 
is this sufficient if the consumer or source connector is simply passing the 
value along?

3) There are a couple of ways the logical type’s Schema object could reference 
its class. The 4 built-ins use the convention that the name corresponds to the 
name of the class, though I suspect this is largely just a technique to 
guarantees a unique name. However, at this time there is no interface or base 
class for logical types, so something would have to be changed to allow for 
easy invocation of the serdes methods. An alternative might be to add to 
“Schema” an optional “serdes” field that references the name of the class that 
implements a serdes interface; this is probably cleaner, though it does 
increase the verbosity of the Schema object.


Thoughts?

Randall Hauch

[1] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/Bits.java
[2] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTime.java
[3] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTimestamp.java