Hello!

I've finally overcome the hurdle of getting Kafka Connect to work with
Apache Ignite.
For people reading this, that have issues with it:
At the end of the mail I'm posting my currently used Converter and
Extractor. Connecting multiple topics to Ignite was possible after setting
different REST ports and offset files for all connectors.

As of now, I have two different Caches:
The first one contains acceleration data, for my key I'm using a
customObject containing the deviceId, measurementId and timestamp. 
Here's a sample from ignitevisorcmd:

| o.a.i.i.binary.BinaryObjectImpl |
de.tudresden.inf.streambench.ignite.cyfacedata.IgniteKey [hash=-2052950622,
deviceId=c7ba0238-4f4c-4237-a42a-89e25d146d9c, measurementId=4,
timestamp=1502738916842] | o.a.i.i.binary.BinaryObjectImpl |
de.tudresden.inf.streambench.ignite.cyfacedata.AccelerationPoint
[hash=1103982604, ax=-3.6176388, ay=4.76925, az=8.710106,
deviceId=c7ba0238-4f4c-4237-a42a-89e25d146d9c, measurementId=4,
timestamp=1502738916842]      |

The second cache contains GPS locations, a similar key/value case.
Since I have about 200 AccelerationPoints for each GpsPoint I want to do
some interpolation as the first task, later there will be more tasks like
removing gravity effects from the acceleration data etc.

While I know that this should be doable with Ignite, I simply can't
understand the given documentation well enough to find any starting point.
All those continuous queries, service and compute grids and whatever all
sound neat, yet I'm having a hard time telling at which feature I'm supposed
to look at and where to find examples more complex than adding 1+1. 

In case my issue isn't clear enough yet, here's are more specific tasks I'm
struggling to solve:

1. On each Ignite node, do the following thing for each AccelerationPoint
(that is ever arriving):
2. Find previous/next GpsPoint according to following rules: Same deviceId,
same measurementId, closest timestamp (positive/negative). All the required
data is available in both key and value in this case.
3. Any way to secure that GPS and Acceleration points that share the same
measurementId and deviceId are located on the same node?
4. Since the data is constantly being streamed, I suppose that it's the best
to remove any finished AccelerationPoint after interpolating it (to a new
Cache) while keeping the GPS data - is there a feature that already does
that for me?

I'd be super happy about any help, information, and links for better
examples. Currently, I'm not really sure how to continue.
I'll add some code samples for an Extractor, a Converter as well as for my
config.xml any suggestions and tips for those are also appreciated!

Bonus question: I'm working with a docker-compose file that is starting
kafka, zookeeper, the testdata producer as well as two instances of ignite
servers as nodes. I'm currently doing the task in IntelliJ with the same
config xml, except that I have clientmode set to true. How would you deploy
the finished product? Do you start another container containing the
instructions as client node?   

Thanks in advance and best regards,
Sven

______

My 'toConnectData' for the Kafka Connect AccelerationPointConverter:

@Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        AccelerationPoint ap;
        try{
            final AccelerationPointDeserializer
accelerationPointDeserializer = new AccelerationPointDeserializer();
            ap = accelerationPointDeserializer.deserialize(topic,
(byte[])value);
        }
        catch(Exception e){
            throw new DataException("Error deserializing Accleration point
in Kafka Connect");
        }
        //String deviceId, long measurementId, long timestamp, double ax,
double ay, double az
        Schema apSchema = SchemaBuilder.struct()
               
.name("de.tudresden.inf.streambench.ignite.deserializer.AccelerationPointDeserializer").version(1).doc("A
schema for AccelerationPoints")
                .field("deviceId", Schema.STRING_SCHEMA)
                .field("measurementId", Schema.INT64_SCHEMA)
                .field("timestamp", Schema.INT16_SCHEMA)
                .field("ax", Schema.FLOAT64_SCHEMA)
                .field("ay", Schema.FLOAT64_SCHEMA)
                .field("az", Schema.FLOAT64_SCHEMA)
                .build();


        return new SchemaAndValue(apSchema, ap);
    }


_______

My AccelerationPointExtractor:

public class AccelerationPointExtractor implements
StreamSingleTupleExtractor<SinkRecord, IgniteKey, AccelerationPoint> {
    @Override public Map.Entry<IgniteKey,
AccelerationPoint>extract(SinkRecord msg) {
        String key = (String) msg.key();
        AccelerationPoint accelerationPoint = (AccelerationPoint)
msg.value();
        String deviceId = ((String) msg.key()).split(":")[0];
        long measurementId = Long.parseLong(((String)
msg.key()).split(":")[1]);
        accelerationPoint.setDeviceId(deviceId);
        accelerationPoint.setMeasurementId(measurementId);
        return new AbstractMap.SimpleEntry<>(new IgniteKey(deviceId,
measurementId, accelerationPoint.getTimestamp()), accelerationPoint);
    }

}

_____

And finally, the xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xmlns:util="http://www.springframework.org/schema/util";
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd";>
    <bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
        

        <property name="peerClassLoadingEnabled" value="false"/>
        <property name="clientMode" value="true"/>

        <property name="cacheConfiguration">
            <list>
                
                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="AccelerationPoint"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                </bean>

                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="Measurement"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                </bean>

                <bean
class="org.apache.ignite.configuration.CacheConfiguration">
                    <property name="name" value="GpsPoint"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="backups" value="1"/>
                </bean>
            </list>
        </property>

        
        <property name="discoverySpi">
            <bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    
                    
                    
                    <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to