Hi ,
I am running spark 2.3.1 with Ignite 2.7.0 . I have configured Postgres as
cachePersistance store . After loading of cache , i can read and convert
data from ignite cache to Spark Dataframe . But while writing back to
ignite , I get below error
class org.apache.ignite.internal.processors.query.IgniteSQLException: *Table
"ENTITY_PLAYABLE" not found*; SQL statement:
INSERT INTO
ENTITY_PLAYABLE(GAMEID,PLAYABLEID,COMPANYID,VERSION,EVENTTIMESTAMP,EVENTTIMESTAMPSYS,COMPANYIDPARTITION,partitionkey)
VALUES(?,?,?,?,?,?,?,?) [42102-197]
at
*org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.streamUpdateQuery*
(IgniteH2Indexing.java:1302)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2206)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor$5.applyx(GridQueryProcessor.java:2204)
at
org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
*Read from Ignite* :
loading cache
val conf = new SparkConf()
conf.setMaster("spark://harshal-patil.local:7077")
// conf.setMaster("local[*]")
conf.setAppName("IGniteTest")
conf.set("spark.executor.heartbeatInterval", "900s")
conf.set("spark.network.timeout", "950s")
conf.set("spark.default.parallelism", "4")
conf.set("spark.cores.max", "4")
conf.set("spark.jars","target/pack/lib/spark_ignite_cache_test_2.11-0.1.jar")
val cfg = () => ServerConfigurationFactory.createConfiguration()
Ignition.start(ServerConfigurationFactory.createConfiguration())
val ic : IgniteContext = new IgniteContext(sc, cfg)
ic.ignite().cache("EntityPlayableCache").loadCache(null.asInstanceOf[IgniteBiPredicate[_,
_]])
*spark.read*
.format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
.option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
.option(IgniteDataFrameSettings.*OPTION_TABLE*,
"ENTITY_PLAYABLE").load().select(*sum*("partitionkey").alias("sum"), *count*
("gameId").as("total")).collect()(0)
*Write To Ignite* :
*df.write*
.format(IgniteDataFrameSettings.*FORMAT_IGNITE*)
.option(IgniteDataFrameSettings.*OPTION_CONFIG_FILE*, configPath)
.option(IgniteDataFrameSettings.*OPTION_TABLE*, "ENTITY_PLAYABLE")
.option(IgniteDataFrameSettings.*OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS*,
"gameId,playableId,companyId,version")
.option(IgniteDataFrameSettings.*OPTION_STREAMER_ALLOW_OVERWRITE*,
"true")
.mode(SaveMode.*Append*)
.save()
I think the problem is with *Spring bean Injection on executer node* ,
please help , what i am doing wrong .