Hello! I've finally overcome the hurdle of getting Kafka Connect to work with Apache Ignite. For people reading this, that have issues with it: At the end of the mail I'm posting my currently used Converter and Extractor. Connecting multiple topics to Ignite was possible after setting different REST ports and offset files for all connectors.
As of now, I have two different Caches: The first one contains acceleration data, for my key I'm using a customObject containing the deviceId, measurementId and timestamp. Here's a sample from ignitevisorcmd: | o.a.i.i.binary.BinaryObjectImpl | de.tudresden.inf.streambench.ignite.cyfacedata.IgniteKey [hash=-2052950622, deviceId=c7ba0238-4f4c-4237-a42a-89e25d146d9c, measurementId=4, timestamp=1502738916842] | o.a.i.i.binary.BinaryObjectImpl | de.tudresden.inf.streambench.ignite.cyfacedata.AccelerationPoint [hash=1103982604, ax=-3.6176388, ay=4.76925, az=8.710106, deviceId=c7ba0238-4f4c-4237-a42a-89e25d146d9c, measurementId=4, timestamp=1502738916842] | The second cache contains GPS locations, a similar key/value case. Since I have about 200 AccelerationPoints for each GpsPoint I want to do some interpolation as the first task, later there will be more tasks like removing gravity effects from the acceleration data etc. While I know that this should be doable with Ignite, I simply can't understand the given documentation well enough to find any starting point. All those continuous queries, service and compute grids and whatever all sound neat, yet I'm having a hard time telling at which feature I'm supposed to look at and where to find examples more complex than adding 1+1. In case my issue isn't clear enough yet, here's are more specific tasks I'm struggling to solve: 1. On each Ignite node, do the following thing for each AccelerationPoint (that is ever arriving): 2. Find previous/next GpsPoint according to following rules: Same deviceId, same measurementId, closest timestamp (positive/negative). All the required data is available in both key and value in this case. 3. Any way to secure that GPS and Acceleration points that share the same measurementId and deviceId are located on the same node? 4. Since the data is constantly being streamed, I suppose that it's the best to remove any finished AccelerationPoint after interpolating it (to a new Cache) while keeping the GPS data - is there a feature that already does that for me? I'd be super happy about any help, information, and links for better examples. Currently, I'm not really sure how to continue. I'll add some code samples for an Extractor, a Converter as well as for my config.xml any suggestions and tips for those are also appreciated! Bonus question: I'm working with a docker-compose file that is starting kafka, zookeeper, the testdata producer as well as two instances of ignite servers as nodes. I'm currently doing the task in IntelliJ with the same config xml, except that I have clientmode set to true. How would you deploy the finished product? Do you start another container containing the instructions as client node? Thanks in advance and best regards, Sven ______ My 'toConnectData' for the Kafka Connect AccelerationPointConverter: @Override public SchemaAndValue toConnectData(String topic, byte[] value) { AccelerationPoint ap; try{ final AccelerationPointDeserializer accelerationPointDeserializer = new AccelerationPointDeserializer(); ap = accelerationPointDeserializer.deserialize(topic, (byte[])value); } catch(Exception e){ throw new DataException("Error deserializing Accleration point in Kafka Connect"); } //String deviceId, long measurementId, long timestamp, double ax, double ay, double az Schema apSchema = SchemaBuilder.struct() .name("de.tudresden.inf.streambench.ignite.deserializer.AccelerationPointDeserializer").version(1).doc("A schema for AccelerationPoints") .field("deviceId", Schema.STRING_SCHEMA) .field("measurementId", Schema.INT64_SCHEMA) .field("timestamp", Schema.INT16_SCHEMA) .field("ax", Schema.FLOAT64_SCHEMA) .field("ay", Schema.FLOAT64_SCHEMA) .field("az", Schema.FLOAT64_SCHEMA) .build(); return new SchemaAndValue(apSchema, ap); } _______ My AccelerationPointExtractor: public class AccelerationPointExtractor implements StreamSingleTupleExtractor<SinkRecord, IgniteKey, AccelerationPoint> { @Override public Map.Entry<IgniteKey, AccelerationPoint>extract(SinkRecord msg) { String key = (String) msg.key(); AccelerationPoint accelerationPoint = (AccelerationPoint) msg.value(); String deviceId = ((String) msg.key()).split(":")[0]; long measurementId = Long.parseLong(((String) msg.key()).split(":")[1]); accelerationPoint.setDeviceId(deviceId); accelerationPoint.setMeasurementId(measurementId); return new AbstractMap.SimpleEntry<>(new IgniteKey(deviceId, measurementId, accelerationPoint.getTimestamp()), accelerationPoint); } } _____ And finally, the xml: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> <property name="peerClassLoadingEnabled" value="false"/> <property name="clientMode" value="true"/> <property name="cacheConfiguration"> <list> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="name" value="AccelerationPoint"/> <property name="atomicityMode" value="ATOMIC"/> <property name="backups" value="1"/> </bean> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="name" value="Measurement"/> <property name="atomicityMode" value="ATOMIC"/> <property name="backups" value="1"/> </bean> <bean class="org.apache.ignite.configuration.CacheConfiguration"> <property name="name" value="GpsPoint"/> <property name="atomicityMode" value="ATOMIC"/> <property name="backups" value="1"/> </bean> </list> </property> <property name="discoverySpi"> <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> <property name="ipFinder"> <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> <property name="addresses"> <list> <value>127.0.0.1:47500..47509</value> </list> </property> </bean> </property> </bean> </property> </bean> </beans> -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/