First, I created two cache:
-------------------------------------------------------------------------------------------------
val smallTableCacheCfg = new CacheConfiguration[Long,
TableSchema.table_small](SMALL)
    smallTableCacheCfg.setIndexedTypes(classOf[Long],
classOf[TableSchema.table_small]) 
    smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED)
    smallTableCacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
    smallTableCacheCfg.setOffHeapMaxMemory(0L)
    smallTableCacheCfg.setSwapEnabled(true)

   val smallTableCache = smallTableContext.fromCache(smallTableCacheCfg)
    smallTableCache.savePairs(smallTableCols_rdd)
-------------------------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------------
val sourceTableCacheCfg = new CacheConfiguration[Long,
TableSchema.table_stream](STREAM)
      sourceTableCacheCfg.setIndexedTypes(classOf[Long],
classOf[TableSchema.table_stream]) 
      sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED)
      sourceTableCacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED)
      sourceTableCacheCfg.setOffHeapMaxMemory(0L)
      sourceTableCacheCfg.setSwapEnabled(true)

      // ignite source table cache
      val sourceTableCache =
streamTableContext.fromCache(sourceTableCacheCfg)

      // sourceTableCols rdd save to cache
      sourceTableCache.savePairs(sourceTableCols_rdd)
-------------------------------------------------------------------------------------------------
Then I run a cross cache join sql on sourceTableCache:
-------------------------------------------------------------------------------------------------
val query =
        s"""
           |select s.fl, s.xz, count(*)
           | from
           |   stream as e, "${SMALL}".smallas s
           | where
           |   e.pz_id=s.pz_id
           | group by
           |   s.fl, s.xz
         """.stripMargin

 val res = sourceTableCache.sql(query)
-------------------------------------------------------------------------------------------------
The TableSchema:
-------------------------------------------------------------------------------------------------
object TableSchema {
  class small(...)
  class stream(...)
}
-------------------------------------------------------------------------------------------------
But it can not find the  schema:
-------------------------------------------------------------------------------------------------
Exception in thread "main" javax.cache.CacheException: class
org.apache.ignite.IgniteException: Failed to parse query: 
select s.fl, s.xz, count(*)
 from
   stream as e, "TableSchema$small".small as s
 where
   e.pz_id=s.pz_id
 group by
   s.fl, s.xz
         
        at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:656)
        at org.apache.ignite.spark.IgniteRDD.sql(IgniteRDD.scala:125)
        at 
main.scala.StreamingJoin$$anonfun$main$1.apply(StreamingJoin.scala:330)
        at 
main.scala.StreamingJoin$$anonfun$main$1.apply(StreamingJoin.scala:268)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteException: Failed to parse query: 
select s.fl, s.xz, count(*)
 from
   url_rz_stream as e, "TableSchema$small".small as s
 where
   e.pz_id=s.pz_id
 group by
   s.fl, s.xz
         
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:806)
        at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.query(IgniteCacheProxy.java:647)
        ... 20 more
Caused by: class org.apache.ignite.IgniteCheckedException: Failed to parse
query: 
select s.fl, s.xz, count(*)
 from
   stream as e, "TableSchema$small".small as s
 where
   e.pz_id=s.pz_id
 group by
   s.fl, s.xz
         
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1782)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.queryTwoStep(GridQueryProcessor.java:799)
        ... 21 more
Caused by: javax.cache.CacheException: Failed to parse query: 
select s.fl, s.xz, count(*)
 from
   stream as e, "TableSchema$small".small as s
 where
   e.pz_id=s.pz_id
 group by
   s.fl, s.xz
         
        at
org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.queryTwoStep(IgniteH2Indexing.java:1043)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor$4.applyx(GridQueryProcessor.java:801)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor$4.applyx(GridQueryProcessor.java:799)
        at
org.apache.ignite.internal.util.lang.IgniteOutClosureX.apply(IgniteOutClosureX.java:36)
        at
org.apache.ignite.internal.processors.query.GridQueryProcessor.executeQuery(GridQueryProcessor.java:1764)
        ... 22 more
Caused by: org.h2.jdbc.JdbcSQLException: Schema "TableSchema$small" not
found; SQL statement:

-------------------------------------------------------------------------------------------------
It seems that the way to setting the const val "SMALL" is incorrect:
-------------------------------------------------------------------------------------------------
private final val SMALL= TableSchema.getClass.getSimpleName + "small"
 private final val STREAM= TableSchema.getClass.getSimpleName + "stream"
-------------------------------------------------------------------------------------------------
What is the correct way to do this?(I refer to the java class
CacheQueryExample.java to write code above)



--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Cross-cache-query-on-spark-schema-not-found-tp3991.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to