Hi,
1. IgniteSinkTask source code <https://github.com/gridgain/apache-ignite/blob/cbada5964ee12a8dc44a7db8797f91709b70d831/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java> shows that Ignite Kafka Connector has this "feature" that stopping any task would stop all other tasks running in the same worker. If you look into the stop() method you see that the Ignite instances is static and it is closed in the stop method. This looks like a usability issue to me. Feel free to open a bug or I can open it for you. I was able to use multiple sink connectors by running them using the apache kafka's connect-standalone.sh script that allows you starting multiple connectors. But if I close one of them it really makes the other ones unusable making them show that "datastream is already closed" error. I never used Confluence Control center before so I am not sure why starting second connector stops the first one. Anyway, as a workaround, you can use the connect-standalone.sh or connect-distributed.sh scripts to start the connectors. 2. Ignite Kafka Connector stores Kafka's key/value SourceRecord as Ignite's key/value Cache.Entry. Unlike Kafka, which allows key to be null, Ignite does not support null keys. You will have an error if your Kafka SourceRecord's key is null. You can work around null keys by providing an "extractor" that builds Cache.Entry from a value. For example, this extractor creates keys as value's hash codes and converts values to byte arrays of the value's string representation (just an example, nothing practical): public class HashCodeKeyValueBuilder implements StreamSingleTupleExtractor<SinkRecord, Integer, byte[]> { @Override public Map.Entry<Integer, byte[]> extract(SinkRecord msg) { return new AbstractMap.SimpleEntry<>(msg.value().hashCode(), msg.value().toString().getBytes()); } } Then you configure extractor like: singleTupleExtractorCls=my.domain.HashCodeKeyValueBuilder StreamSingleTupleExtractor is defined inside ignite-core.jar so you do not need additional dependencies. Ignite stores custom objects in a cross-platform binary format <https://apacheignite.readme.io/docs/binary-marshaller>, simple types and arrays of simple types as is. As I understood in your case you have a not-null String key and Byte[] value. In this case Ignite will store this as is so that you create your scan query something like Query<Cache.Entry<Integer, byte[]>> query = new ScanQuery<>(...) and then deserialise your byte[] array using your custom deserialiser. This has a huge disadvantage that you cannot efficiently work with the objects in Ignite since Ignite does not know about your custom format. Another option is having custom Kafka converter <https://kafka.apache.org/0110/javadoc/org/apache/kafka/connect/storage/Converter.html> that would deserialise byte[] array inside Kafka and store them as CustomObject in Ignite. In this case the object would be stored in binary format (you will see o.a.i.i.binary.BinaryObjectImpl in the scan query results). You can work with such objects directly using binary API (most efficient, no serialisation involved, no CustomObject jars required on Ignite server nodes) or work with CustomObject using static type API (less efficient, CustomObject has to be deployed).
