I am trying to perform a simple insert to an Ignite cache (version 2.2) from
a Spark application, using the code below (in scala):
val addresses=new util.ArrayList[String]()
addresses.add("127.0.0.1:48500..48520")
// IGNITE CONTEXT CONFIGURATIONS
val igniteContext:IgniteContext=new IgniteContext(sc,()=>new
IgniteConfiguration()
.setDiscoverySpi(new TcpDiscoverySpi().
setLocalPort(48511).setLocalPortRange(20).setIpFinder(new
TcpDiscoveryVmIpFinder().setAddresses(addresses))),true)
// CHECKING IF Spark CACHE IS PRESENT
println(igniteContext.ignite().cacheNames())
// RETURNS [Spark]
// FETCHING THE SPARK CACHE
val ignite_cache_rdd:IgniteRDD[String,Custom_Class]
=igniteContext.fromCache[String,Custom_Class]("Spark")
// SAVING PAIR_RDD TO CACHE
ignite_cache_rdd.savePairs(ignite_PairRDD)
// PRINTING CONTENTS FROM CACHE
ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)
//RETURNS AN EMPTY DATAFRAME
The contents of the ignite_PairRDD value field, when converted to a
Dataframe, show up with no issue:
+-+-+-+
A |B |C|
22|6 |7|
+-+-+-+
The problem is after using "savePairs" and doing a "show()", the Ignite
cache seems to be empty.
+-+-+-+
A |B |C|
| | |
+-+-+-+
The "Spark" cache has the following configurations (in java):
CacheConfiguration cache_conf=new
CacheConfiguration().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(String.class,Custom_Class.class).setName("Spark");
The "Custom_Class" is created using the following structure:
public class Equity_Data implements Serializable {
@QuerySqlField(index = true)
private A;
@QuerySqlField(index = true)
private B;
@QuerySqlField(index = true)
private C;
}
I made sure to check if the cache exists, and the SQL query seems to be
correct because I do not get any errors, just an empty dataframe.
Do I need to change some configuration? Thank you.
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/