Hi,

   1. IgniteSinkTask source code
   
<https://github.com/gridgain/apache-ignite/blob/cbada5964ee12a8dc44a7db8797f91709b70d831/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java>
   shows that Ignite Kafka Connector has this "feature" that stopping any task
   would stop all other tasks running in the same worker. If you look into the
   stop() method you see that the Ignite instances is static and it is closed
   in the stop method. This looks like a usability issue to me. Feel free to
   open a bug or I can open it for you.
   I was able to use multiple sink connectors by running them using the
   apache kafka's connect-standalone.sh script that allows you starting
   multiple connectors. But if I close one of them it really makes the other
   ones unusable making them show that "datastream is already closed" error.
   I never used Confluence Control center before so I am not sure why
   starting second connector stops the first one. Anyway, as a workaround, you
   can use the connect-standalone.sh or connect-distributed.sh scripts to
   start the connectors.

   2. Ignite Kafka Connector stores Kafka's key/value SourceRecord as
   Ignite's key/value Cache.Entry. Unlike Kafka, which allows key to be null,
   Ignite does not support null keys. You will have an error if your Kafka
   SourceRecord's key is null. You can work around null keys by providing an
   "extractor" that builds Cache.Entry from a value. For example, this
   extractor creates keys as value's hash codes and converts values to byte
   arrays of the value's string representation (just an example, nothing
   practical):

public class HashCodeKeyValueBuilder implements
StreamSingleTupleExtractor<SinkRecord, Integer, byte[]> {
    @Override public Map.Entry<Integer, byte[]> extract(SinkRecord msg) {
        return new AbstractMap.SimpleEntry<>(msg.value().hashCode(),
msg.value().toString().getBytes());
    }

}

Then you configure extractor like:

singleTupleExtractorCls=my.domain.HashCodeKeyValueBuilder


StreamSingleTupleExtractor is defined inside ignite-core.jar so you do not
need additional dependencies.

Ignite stores custom objects in a cross-platform binary format
<https://apacheignite.readme.io/docs/binary-marshaller>, simple types and
arrays of simple types as is.

As I understood in your case you have a not-null String key and Byte[]
value. In this case Ignite will store this as is so that you create your
scan query something like

Query<Cache.Entry<Integer, byte[]>> query = new ScanQuery<>(...)

and then deserialise your byte[] array using your custom deserialiser. This
has a huge disadvantage that you cannot efficiently work with the objects
in Ignite since Ignite does not know about your custom format.

Another option is having custom Kafka converter
<https://kafka.apache.org/0110/javadoc/org/apache/kafka/connect/storage/Converter.html>
that would deserialise byte[] array inside Kafka and store them as
CustomObject in Ignite. In this case the object would be stored in binary
format (you will see o.a.i.i.binary.BinaryObjectImpl in the scan query
results). You can work with such objects directly using binary API (most
efficient, no serialisation involved, no CustomObject jars required on
Ignite server nodes) or work with CustomObject using static type API (less
efficient, CustomObject has to be deployed).

Reply via email to