Mikhail Cherkasov created IGNITE-7523:
-----------------------------------------

             Summary: Exception on data expiration after sharedRDD.saveValues 
call
                 Key: IGNITE-7523
                 URL: https://issues.apache.org/jira/browse/IGNITE-7523
             Project: Ignite
          Issue Type: Bug
          Components: spark
    Affects Versions: 2.3
            Reporter: Mikhail Cherkasov
             Fix For: 2.5


Reproducer:

package rdd_expiration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.Cache;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.spark.JavaIgniteContext;
import org.apache.ignite.spark.JavaIgniteRDD;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;

/**
* This example demonstrates how to create an JavaIgnitedRDD and share it with 
multiple spark workers. The goal of this
* particular example is to provide the simplest code example of this logic.
* <p>
* This example will start Ignite in the embedded mode and will start an 
JavaIgniteContext on each Spark worker node.
* <p>
* The example can work in the standalone mode as well that can be enabled by 
setting JavaIgniteContext's
* \{@code standalone} property to \{@code true} and running an Ignite node 
separately with
* `examples/config/spark/example-shared-rdd.xml` config.
*/
public class RddExpiration {
/**
* Executes the example.
* @param args Command line arguments, none required.
*/
public static void main(String args[]) throws InterruptedException {

Ignite server = null;

for (int i = 0; i < 4; i++) {
IgniteConfiguration serverCfg = createIgniteCfg();
serverCfg.setClientMode(false);
serverCfg.setIgniteInstanceName("Server" + i);
server = Ignition.start(serverCfg);
}

server.active(true);


// Spark Configuration.
SparkConf sparkConf = new SparkConf()
.setAppName("JavaIgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2");

// Spark context.
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);

// Creates Ignite context with specific configuration and runs Ignite in the 
embedded mode.
JavaIgniteContext<UUID, Integer> igniteContext = new JavaIgniteContext<UUID, 
Integer>(
sparkContext,
new IgniteOutClosure<IgniteConfiguration>() {
@Override public IgniteConfiguration apply() {
return createIgniteCfg();
}
},
true);

// Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
JavaIgniteRDD<UUID, Integer> sharedRDD = igniteContext.<UUID, 
Integer>fromCache("sharedRDD");

long start = System.currentTimeMillis();

long totalLoaded = 0;

while(System.currentTimeMillis() - start < 55_000) {
// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = new ArrayList<>(20_000);

for (int i = 0; i < 20_000; i++)
data.add(i);

// Preparing a Java RDD.
JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data);

sharedRDD.saveValues(javaRDD);

totalLoaded += 20_000;
}
System.out.println("Loaded " + totalLoaded);

for (;;) {

System.out.println(">>> Iterating over Ignite Shared RDD...");

IgniteCache<Object, Object> cache = server.getOrCreateCache("sharedRDD");

AtomicLong recordsLeft = new AtomicLong(0);
for (Cache.Entry<Object, Object> entry : cache) {
recordsLeft.incrementAndGet();
}
System.out.println("Left: " + recordsLeft.get());

}
// Close IgniteContext on all the workers.
// igniteContext.close(true);
}

private static IgniteConfiguration createIgniteCfg() {

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setClientMode(true);

DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setCheckpointPageBufferSize(16 * 1024 * 1024)
.setMaxSize(8 * 16 * 1024 * 1024)
.setPersistenceEnabled(true));

cfg.setDataStorageConfiguration(memCfg);

TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder(false);
finder.setAddresses(Arrays.asList("localhost:47500..47600"));

cfg.setDiscoverySpi( new TcpDiscoverySpi().setIpFinder(finder));


CacheConfiguration<Object, Object> cacheCfg = new 
CacheConfiguration<>("sharedRDD");
cacheCfg.setAtomicityMode(ATOMIC);
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(1);
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
cacheCfg.setEagerTtl(true);

cacheCfg.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration.ONE_MINUTE));

cfg.setCacheConfiguration(cacheCfg);

return cfg;
}
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to