First, I created two cache:
-------------------------------------------------------------------------------------------------
val smallTableCacheCfg = new CacheConfiguration[Long,
TableSchema.table_small](SMALL)
smallTableCacheCfg.setIndexedTypes(classOf[Long],
classOf[TableSchema.table_small])
smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED)
smallTableCacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
smallTableCacheCfg.setOffHeapMaxMemory(0L)
smallTableCacheCfg.setSwapEnabled(true)
val smallTableCache = smallTableContext.fromCache(smallTableCacheCfg)
smallTableCache.savePairs(smallTableCols_rdd)
-------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------
val sourceTableCacheCfg = new CacheConfiguration[Long,
TableSchema.table_stream](STREAM)
sourceTableCacheCfg.setIndexedTypes(classOf[Long],
classOf[TableSchema.table_stream])
sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED)
sourceTableCacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
sourceTableCacheCfg.setOffHeapMaxMemory(0L)
sourceTableCacheCfg.setSwapEnabled(true)
// ignite source table cache
val sourceTableCache =
streamTableContext.fromCache(sourceTableCacheCfg)
// sourceTableCols rdd save to cache
sourceTableCache.savePairs(sourceTableCols_rdd)
-------------------------------------------------------------------------------------------------
Then I run a cross cache join sql on sourceTableCache:
-------------------------------------------------------------------------------------------------
val query =
s"""
|select s.fl, s.xz, count(*)
| from
| stream as e, "${SMALL}".smallas s
| where
| e.pz_id=s.pz_id
| group by
| s.fl, s.xz
""".stripMargin
val res = sourceTableCache.sql(query)
-------------------------------------------------------------------------------------------------
The TableSchema:
-------------------------------------------------------------------------------------------------
object TableSchema {
class small(...)
class stream(...)
}
-------------------------------------------------------------------------------------------------
But it can not find the schema:
-------------------------------------------------------------------------------------------------
Exception in thread "main" javax.cache.CacheException: class
org.apache.ignite.IgniteException: Failed to parse query:
select s.fl, s.xz, count(*)
from
stream as e, "TableSchema$small".small as s
where
e.pz_id=s.pz_id
group by
s.fl, s.xz
at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:656)
at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:125)
at
main.scala.StreamingJoin$$anonfun$main$1.apply(StreamingJoin.scala:330)
at
main.scala.StreamingJoin$$anonfun$main$1.apply(StreamingJoin.scala:268)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteException: Failed to parse query:
select s.fl, s.xz, count(*)
from
url_rz_stream as e, "TableSchema$small".small as s
where
e.pz_id=s.pz_id
group by
s.fl, s.xz
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:806)
at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:647)
... 20 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to parse
query:
select s.fl, s.xz, count(*)
from
stream as e, "TableSchema$small".small as s
where
e.pz_id=s.pz_id
group by
s.fl, s.xz
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1782)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:799)
... 21 more
Caused by: javax.cache.CacheException: Failed to parse query:
select s.fl, s.xz, count(*)
from
stream as e, "TableSchema$small".small as s
where
e.pz_id=s.pz_id
group by
s.fl, s.xz
at
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1043)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor$4.applyx(GridQueryProcessor.java:801)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor$4.applyx(GridQueryProcessor.java:799)
at
org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
at
org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1764)
... 22 more
Caused by: org.h2.jdbc.JdbcSQLException: Schema "TableSchema$small" not
found; SQL statement:
-------------------------------------------------------------------------------------------------
It seems that the way to setting the const val "SMALL" is incorrect:
-------------------------------------------------------------------------------------------------
private final val SMALL= TableSchema.getClass.getSimpleName + "small"
private final val STREAM= TableSchema.getClass.getSimpleName + "stream"
-------------------------------------------------------------------------------------------------
What is the correct way to do this?(I refer to the java class
CacheQueryExample.java to write code above)
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Cross-cache-query-on-spark-schema-not-found-tp3991.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.