Well, all changes in Ignite cache (regardless whether they were caused by
another RDD or external changes in cache) will be visible to RDD users
immediately.

Most possible this happened because you set different types for keys and
values in ignite native api and ignite rdd.

I've checked this behavior by adding inserting value to the ignite rdd
cache right from ignite native api. Here is a code, you can check that
everything works fine:

public class SharedRDDExample {
    /**
     * Executes the example.
     * @param args Command line arguments, none required.
     */
    public static void main(String args[]) {
        // Spark Configuration.
        SparkConf sparkConf = new SparkConf()
            .setAppName("JavaIgniteRDDExample")
            .setMaster("local")
            .set("spark.executor.instances", "2");

        // Spark context.
        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        // Adjust the logger to exclude the logs of no interest.
        Logger.getRootLogger().setLevel(Level.ERROR);
        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);

        // Creates Ignite context with specific configuration and runs
Ignite in the embedded mode.
        JavaIgniteContext<Integer, Integer> igniteContext = new
JavaIgniteContext<Integer, Integer>(
            sparkContext,"examples/config/spark/example-shared-rdd.xml", false);

        // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
        JavaIgniteRDD<Integer, Integer> sharedRDD =
igniteContext.<Integer, Integer>fromCache("sharedRDD");

        // Define data to be stored in the Ignite RDD (cache).
        List<Integer> data = new ArrayList<>(20);

        for (int i = 0; i<20; i++) {
            data.add(i);
        }

        // Preparing a Java RDD.
        JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);

        // Fill the Ignite RDD in with Int pairs. Here Pairs are
represented as Scala Tuple2.
        sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new
PairFunction<Integer, Integer, Integer>() {
            @Override public Tuple2<Integer, Integer> call(Integer
val) throws Exception {
                return new Tuple2<Integer, Integer>(val, val);
            }
        }));

        System.out.println(">>> Iterating over Ignite Shared RDD...");

        // Iterate over the Ignite RDD.
        sharedRDD.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
            @Override public void call(Tuple2<Integer, Integer> tuple)
throws Exception {
                System.out.println("(" + tuple._1 + "," + tuple._2 + ")");
            }
        });

        System.out.println(">>> Transforming values stored in Ignite
Shared RDD...");

        // Filter out even values as a transformed RDD.
        JavaPairRDD<Integer, Integer> transformedValues =
            sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {
                @Override public Boolean call(Tuple2<Integer, Integer>
tuple) throws Exception {
                    return tuple._2() % 2 == 0;
                }
            });

        // Print out the transformed values.
        transformedValues.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
            @Override public void call(Tuple2<Integer, Integer> tuple)
throws Exception {
                System.out.println("(" + tuple._1 + "," + tuple._2 + ")");
            }
        });

        System.out.println(">>> Executing SQL query over Ignite Shared RDD...");

        // Execute SQL query over the Ignite RDD.
        Dataset df = sharedRDD.sql("select _val from Integer where _key < 9");

        // Show the result of the execution.
        df.show();

        igniteContext.ignite().cache("sharedRDD").put(100,100);

        System.out.println("After put into ignite cache:" +
igniteContext.fromCache("sharedRDD").take(200));

        // Close IgniteContext on all the workers.
        igniteContext.close(true);
    }
}


Evgenii

Reply via email to