Dmitry,
When you invoke cacheCfg.setIndexedTypes(classOf[String], classOf[Row])
Ignite will create table with name "Row" because internally simple class
name uses as table name. In fact your data frame contains
GenericRowWithSchema objects, thus table name is "GenericRowWithSchema" in
this case. Since "Row" table is empty then result set size of your SQL
query will be 0.
Storing instances of the Row trait in cache isn't good idea from my point
of view for the following reasons:
- Your code depends on actual Row implementation in Spark data frame that
could be changed in the future;
- Row implementation is wrapper for actual data tuples so it requires more
RAM and additional serialization/deserialization overhead;
- It is impossible to index Row instances because it is generic object
without concrete fields that can be indexed by Ignite because only class
fields can be indexed.
Better way is store TRow1 objects in the cache because you have full
control on implementation details. I've reworked your code a little bit in
order to exlpain my idea.
/** Type alias for `QuerySqlField`. */
type ScalarCacheQuerySqlField = QuerySqlField @field
// query fields should be annotated
case class TRow1(@ScalarCacheQuerySqlField s1: String,
@ScalarCacheQuerySqlField s2: String) extends
Serializable {
}
val sc: SparkContext = new SparkContext("local[*]", "test")
val sqlCtx: SQLContext = new SQLContext(sc)
// create rdd
val data = List[TRow1](TRow1("1", "test-1"), TRow1("2", "test-2"))
val df = sqlCtx.createDataFrame[TRow1](data)
val col: Array[Row] = df.collect()
// create ignite context (embeded mode)
val igniteContext = new IgniteContext[String, TRow1](sc, () => new
IgniteConfiguration().setPeerClassLoadingEnabled(true), false)
// cache config
val cacheCfg = new CacheConfiguration[String, TRow1]()
cacheCfg.setName("cache01")
cacheCfg.setIndexedTypes(classOf[String], classOf[TRow1]) // table has
"TRow1" name
// ignite cache
val cache = igniteContext.fromCache(cacheCfg)
// df rdd save to cache
val df_rdd = df.rdd.map(r => (r.getAs[String](0),
TRow1(r.getAs[String](0), r.getAs[String](1))))
cache.savePairs(df_rdd)
// query
val c = cache.count
println(s"cache.count $c")
val result = cache.sql("select * from TRow1").collect
result.foreach(println(_))
I hope it will helpful for you.
On Wed, Mar 2, 2016 at 7:30 PM, DmitryB <[email protected]> wrote:
> Hi team,
>
> I try to save, index and query spark DataFrames with Ignite cache
> this this my code:
>
> import org.apache.spark.sql.Row
> import org.apache.ignite.configuration._
> import org.apache.ignite.spark.IgniteContext
> import org.apache.ignite.configuration.CacheConfiguration
> import sandbox._
>
> // create rdd
> val data = List[TRow1](TRow1("1", "test-1"), TRow1("2", "test-2"))
> val df = sqlContext.createDataFrame[TRow1](data)
> >> df: org.apache.spark.sql.DataFrame = [key: string, name: string]
>
> // create ignite context (embeded mode)
> val igniteContext = new IgniteContext[String, Row](sc, () => new
> IgniteConfiguration(), false)
>
> // cache config
> val cacheCfg = new CacheConfiguration[String, Row]()
> cacheCfg.setName("cache01")
> cacheCfg.setIndexedTypes(classOf[String], classOf[Row])
>
> // ignite cache
> val cache = igniteContext.fromCache(cacheCfg)
> >> cache:
> org.apache.ignite.spark.IgniteRDD[String,org.apache.spark.sql.Row]
> >> = IgniteRDD[1] at RDD at IgniteAbstractRDD.scala:27
>
> // df rdd save to cache
> val df_rdd = df.rdd.map(r => (r.getAs[String](0), r))
> >> df_rdd: org.apache.spark.rdd.RDD[(String, org.apache.spark.sql.Row)] =
> >> MapPartitionsRDD[4] at map at <console>:38
> cache.savePairs(df_rdd)
>
> // query
> val c = cache.count
> >> res3: Long = 2
>
> val result = cache.sql("select * from Row").collect
> >> result: Array[org.apache.spark.sql.Row] = Array()
>
> Unfortunately, running SQL query i don't get any result;
>
> Could you plz recommend the correct way for storing, indexing and querying
> sql.Rows with ignite cache
>
> Best regards,
> Dmitry
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-ignite-users.70518.x6.nabble.com/index-and-query-org-apache-ignite-spark-IgniteRDD-String-org-apache-spark-sql-Row-tp3343.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>
--
Andrey Gura
GridGain Systems, Inc.
www.gridgain.com