Hello,I could use some assistance on testing out Ignite over Spark,
specifically when it comes to sql over RDD Objects.  I am able to load up an
IgniteRDD with tuples, and do some aggregations over it.  However, when I
try to invoke .sql() on the IgniteRDD, I am getting errors.Here is my test,
distilled down to some basics.Say I have my case class:
   package examples   case class MyObject(someint: Int, somestring: String)
And cache definition:
   <property name="cacheConfiguration">      <list>       
  
<bean
class="org.apache.ignite.configuration.CacheConfiguration">        
    
<property name="name" value="cache"/>            
<property name="indexedTypes">                <list>      
            
<value>org.apache.ignite.lang.IgniteUuid</value>                  
<value>examples.MyObject</value>                </list>       
     
</property>         </bean>      </list>  
</property>
I am setting this indexedTypes up via XML since IgniteConfiguration is not
serializable, and therefore can't be done programatically in my Spark Job. 
So then I switch over to the job which is something (again distilled for
brevity) like such:
   import org.apache.ignite.spark.IgniteContext   import
org.apache.ignite.lang.IgniteUuid   import org.apache.ignite.configuration._  
import examples.MyObject   val igniteContext = new IgniteContext[IgniteUuid,
MyObject](sc, "/usr/lib/ignite/config/default-config.xml", false)  
val cache = igniteContext.fromCache("cache")   val input =
igniteContext.sparkContext.textFile("hdfs://somefile")   val rdd =
input.map(s => s.split("\\|")).map(line => new MyObject(     
line(0).toInt,      line(1)   ))   cache.saveValues(rdd)
At this point I can interact with the cache as an RDD, but not through
IgniteRDD.sql.  I either get empty results, or match errors depending on my
declared key type. I have tried several techniques with different key types
and different tuples.Figuring I needed a schema, I change the cache
indexedTypes and try something like:
   import org.apache.spark.sql._   import org.apache.spark.sql.types._   val
schema = DataTypes.createStructType(Array(       
StructField("someid", StringType, true),       
StructField("somestring", StringType, false)   ))   val rows =
input.map(s => s.split("\\|")).map(line => {     
RowFactory.create(line(0),line(1))   })   val df =
igniteContext.sqlContext.createDataFrame(rows, schema)  
cache.saveValues(df.rdd)    val rs = cache.sql("select * from Row limit
5")   //rs.count=0
I have also tried .map() with a the cache config key type.  But in the end,
I still cannot get results out of the cache via IgniteRDD.sql.
Where am I going wrong?  I see items in the cache, and in the rdd.  take(n)
returns the expected objects.  But that is where my success ends.  If you
have any points on the object.fields -> table.columns that is a bit
frustrating too.
Hopefully the spark specific documentation thickens soon, I know it's new,
but for now I could use some tips!  Thanks!



--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Guidance-on-Ignite-Spark-SQL-Functionality-tp3138.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to