Hello,I could use some assistance on testing out Ignite over Spark,
specifically when it comes to sql over RDD Objects. I am able to load up an
IgniteRDD with tuples, and do some aggregations over it. However, when I
try to invoke .sql() on the IgniteRDD, I am getting errors.Here is my test,
distilled down to some basics.Say I have my case class:
package examples case class MyObject(someint: Int, somestring: String)
And cache definition:
<property name="cacheConfiguration"> <list>
<bean
class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="cache"/>
<property name="indexedTypes"> <list>
<value>org.apache.ignite.lang.IgniteUuid</value>
<value>examples.MyObject</value> </list>
</property> </bean> </list>
</property>
I am setting this indexedTypes up via XML since IgniteConfiguration is not
serializable, and therefore can't be done programatically in my Spark Job.
So then I switch over to the job which is something (again distilled for
brevity) like such:
import org.apache.ignite.spark.IgniteContext import
org.apache.ignite.lang.IgniteUuid import org.apache.ignite.configuration._
import examples.MyObject val igniteContext = new IgniteContext[IgniteUuid,
MyObject](sc, "/usr/lib/ignite/config/default-config.xml", false)
val cache = igniteContext.fromCache("cache") val input =
igniteContext.sparkContext.textFile("hdfs://somefile") val rdd =
input.map(s => s.split("\\|")).map(line => new MyObject(
line(0).toInt, line(1) )) cache.saveValues(rdd)
At this point I can interact with the cache as an RDD, but not through
IgniteRDD.sql. I either get empty results, or match errors depending on my
declared key type. I have tried several techniques with different key types
and different tuples.Figuring I needed a schema, I change the cache
indexedTypes and try something like:
import org.apache.spark.sql._ import org.apache.spark.sql.types._ val
schema = DataTypes.createStructType(Array(
StructField("someid", StringType, true),
StructField("somestring", StringType, false) )) val rows =
input.map(s => s.split("\\|")).map(line => {
RowFactory.create(line(0),line(1)) }) val df =
igniteContext.sqlContext.createDataFrame(rows, schema)
cache.saveValues(df.rdd) val rs = cache.sql("select * from Row limit
5") //rs.count=0
I have also tried .map() with a the cache config key type. But in the end,
I still cannot get results out of the cache via IgniteRDD.sql.
Where am I going wrong? I see items in the cache, and in the rdd. take(n)
returns the expected objects. But that is where my success ends. If you
have any points on the object.fields -> table.columns that is a bit
frustrating too.
Hopefully the spark specific documentation thickens soon, I know it's new,
but for now I could use some tips! Thanks!
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Guidance-on-Ignite-Spark-SQL-Functionality-tp3138.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.