I use two IgniteContext instance in my spark streaming job, one
IgniteContext contains a cacheConfiguration, the cacheConfiguration was for
the dataframe to join.
Here the code is:
-----------------------------------------------------------------------------------------------------
package main.scala
/**
* Created by F7753 on 2016/3/30.
*/
import kafka.serializer.StringDecoder
import org.apache.ignite.cache.CacheMode
import org.apache.ignite.cache.query.annotations.QuerySqlField
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType}
import org.apache.log4j._
import org.apache.ignite.Ignition
import org.apache.ignite.configuration._
import org.apache.ignite.spark.IgniteContext
import org.apache.ignite.configuration.CacheConfiguration
import scala.annotation.meta.field
object Schema {
val small = StructType( Array(
StructField("field0", StringType),
StructField("field2", LongType),
StructField("field3", LongType),
StructField("field4", LongType),
StructField("field5", LongType),
StructField("field6", LongType),
StructField("field7", StringType),
StructField("field8", StringType),
StructField("field9", StringType),
StructField("field10", StringType),
StructField("field11", StringType),
StructField("field12", StringType),
StructField("field13", StringType),
StructField("field14", StringType),
StructField("field15", StringType),
StructField("field16", StringType),
StructField("field17", StringType),
StructField("field18", StringType),
StructField("field19", StringType))
)
val source = StructType( Array(
StructField("field0", LongType),
StructField("field1", StringType),
StructField("field2", StringType),
StructField("field3", StringType),
StructField("field4", IntegerType),
StructField("field5", IntegerType),
StructField("field6", IntegerType),
StructField("field7", IntegerType),
StructField("field8", IntegerType),
StructField("field9", StringType),
StructField("field10", StringType),
StructField("field11", IntegerType),
StructField("field12", StringType),
StructField("field13", IntegerType),
StructField("field14", StringType),
StructField("field15",StringType),
StructField("field16", IntegerType),
StructField("field17", StringType),
StructField("field18", IntegerType),
StructField("field19", StringType),
StructField("field20", StringType),
StructField("field21", StringType),
StructField("field22", IntegerType),
StructField("field23", IntegerType),
StructField("field24", StringType),
StructField("field25", IntegerType),
StructField("field26", IntegerType),
StructField("field27", IntegerType),
StructField("field28", IntegerType),
StructField("field29", IntegerType),
StructField("field30", LongType),
StructField("field31", StringType),
StructField("field32", LongType),
StructField("field33", StringType),
StructField("field34", LongType),
StructField("field35", StringType),
StructField("field36", LongType),
StructField("field37", StringType),
StructField("field38", IntegerType),
StructField("field39", IntegerType),
StructField("field40", IntegerType),
StructField("field41", LongType))
)
}
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
object StreamingJoin {
private final val SMALL = StreamingJoin.getClass.getSimpleName +
"SmallTable"
private final val SOURCE = StreamingJoin.getClass.getSimpleName +
"SourceTable"
def checkArgs(args: Array[String]): Unit = {
args(0) match {
case "Socket" =>
if(args.length < 4) {
System.err.println("Usage: StreamingTest Socket <ip> <port>
<receiverNums>")
System.exit(1)
}
case _ =>
System.err.println("Unsurpported source...")
System.exit(1)
}
}
def main(args: Array[String]): Unit = {
/** Type alias for `QuerySqlField`. */
type ScalarCacheQuerySqlField = QuerySqlField @field
checkArgs(args)
val scf = new SparkConf().setAppName("StreamingTest")
val ssc = new StreamingContext(scf, Seconds(10))
val sqlCtx:SQLContext =
SQLContextSingleton.getInstance(ssc.sparkContext)
PropertyConfigurator.configure("./conf/log.conf")
val smallTableData =
SQLContextSingleton.getInstance(ssc.sparkContext).read
.format("com.databricks.spark.csv")
.option("header", "false")
.schema(Schema.zb_test_schema)
.load("hdfs://host:9000/fileName.csv")
val smallTableDF = sqlCtx.createDataFrame(smallTableData.rdd,
classOf[smallTableCache])
// create ignite context (embeded mode)
val smallTableContext = new IgniteContext[String,
small](ssc.sparkContext,
"/home/test/SparkIgniteStreaming/config/example-cache.xml")
// small table cache config
val smallTableCacheCfg = new CacheConfiguration[String, small](SMALL)
smallTableCacheCfg.setIndexedTypes(classOf[BigInt], classOf[small]) //
table has "zb_test" name
smallTableCacheCfg.setCacheMode(CacheMode.REPLICATED)
// ignite small table cache
val smallTableCache = smallTableContext.fromCache(smallTableCacheCfg)
// smallTableCols rdd save to cache
val smallTableCols_rdd = smallTableDF.rdd.map(
r => (
r.getAs[String](0),
new small(
r.getAs[BigInt](0),
r.getAs[String](1),
r.getAs[String](2),
r.getAs[String](3),
r.getAs[Int](4),
r.getAs[Int](5),
r.getAs[Int](6),
r.getAs[Int](7),
r.getAs[Int](8),
r.getAs[String](9),
r.getAs[String](10),
r.getAs[Int](11),
r.getAs[String](12),
r.getAs[String](13),
r.getAs[String](14),
r.getAs[Int](15),
r.getAs[String](16),
r.getAs[Int](17),
r.getAs[String](18),
r.getAs[String](19),
r.getAs[String](20),
r.getAs[Int](21),
r.getAs[Int](22),
r.getAs[String](23),
r.getAs[Int](24),
r.getAs[Int](25),
r.getAs[Int](26),
r.getAs[Int](27),
r.getAs[Int](28),
r.getAs[BigInt](29),
r.getAs[String](30),
r.getAs[BigInt](31),
r.getAs[String](32),
r.getAs[BigInt](33),
r.getAs[String](34),
r.getAs[BigInt](35),
r.getAs[String](36),
r.getAs[Int](37),
r.getAs[Int](38),
r.getAs[Int](39),
r.getAs[BigInt](40)
)))
smallTableCache.savePairs(smallTableCols_rdd)
def creatSourceDStream: DStream[String] = {
args(0) match {
case "Socket" =>
ssc.socketTextStream(args(1), args(2).toInt,
StorageLevel.OFF_HEAP)
}
}
val linesN = (1 to args(args.length-1).toInt).map(i =>
creatSourceDStream.flatMap(_.split("\n")))
val lines = ssc.union(linesN)
lines.foreachRDD( (rdd: RDD[String], time: Time) => {
val sqlCtx = SQLContextSingleton.getInstance(rdd.sparkContext)
val rowRdd = rdd.map(_.split(",")).map(row => Row(row(0),
row(1).toLong, row(2).toLong, row(3).toLong, row(4).toLong, row(5).toLong,
row(6), row(7), row(8), row(9), row(10), row(11), row(12), row(13),
row(14), row(15), row(16), row(17), row(18)))
val sourceTableColsDF = sqlCtx.createDataFrame(rowRdd,
classOf[source])
val sourceTableCols_rdd = sourceTableColsDF.rdd.map(
r => (
r.getAs[String](0),
new source(
r.getAs[String](0),
r.getAs[BigInt](1),
r.getAs[BigInt](2),
r.getAs[BigInt](3),
r.getAs[BigInt](4),
r.getAs[String](5),
r.getAs[String](6),
r.getAs[String](7),
r.getAs[String](8),
r.getAs[String](9),
r.getAs[String](10),
r.getAs[String](11),
r.getAs[String](12),
r.getAs[String](13),
r.getAs[String](14),
r.getAs[String](15),
r.getAs[String](16),
r.getAs[String](17)
)
)
)
// source table cache config
val sourceTableCacheCfg = new CacheConfiguration[String,
source](SOURCE)
sourceTableCacheCfg.setIndexedTypes(classOf[BigInt], classOf[source])
// table has "url_rz" name
sourceTableCacheCfg.setCacheMode(CacheMode.PARTITIONED)
val streamTableContext = new IgniteContext[String,
source](ssc.sparkContext, "/home/test/config/default-config.xml")
// ignite source table cache
val sourceTableCache =
streamTableContext.fromCache(sourceTableCacheCfg)
// sourceTableCols rdd save to cache
sourceTableCache.savePairs(sourceTableCols_rdd)
val query =
s"""
|select s.fl, s.xz count(*)
| from
| SourceTable as e, \" """ + SOURCE+ """\".SmallTable as s
| where
| e.pz_id=s.pz_id
| group by
| s.fl, s.xz
""".stripMargin
val res = sourceTableCache.sql(query)
println("-----------------------------")
println("Time: " + time)
println("-----------------------------")
res.show(10)
})
ssc.start()
ssc.awaitTermination()
}
}
-----------------------------------------------------------------------------------------------------
the Schema.scala:
-----------------------------------------------------------------------------------------------------
package main.scala
import org.apache.ignite.scalar.scalar._
/**
* Created by INFI on 2016/3/31.
*/
class small(@ScalarCacheQuerySqlField field0: String,
@ScalarCacheQuerySqlField field1: BigInt,
@ScalarCacheQuerySqlField field2: BigInt,
@ScalarCacheQuerySqlField field3: BigInt,
@ScalarCacheQuerySqlField field4: BigInt,
@ScalarCacheQuerySqlField field5: String,
@ScalarCacheQuerySqlField field6: String,
@ScalarCacheQuerySqlField field7: String,
@ScalarCacheQuerySqlField field8: String,
@ScalarCacheQuerySqlField field9: String,
@ScalarCacheQuerySqlField field10: String,
@ScalarCacheQuerySqlField field11: String,
@ScalarCacheQuerySqlField field12: String,
@ScalarCacheQuerySqlField field13: String,
@ScalarCacheQuerySqlField field14: String,
@ScalarCacheQuerySqlField field15: String,
@ScalarCacheQuerySqlField field16: String,
@ScalarCacheQuerySqlField field17: String)extends
Serializable {
}
class source(@ScalarCacheQuerySqlField field0: BigInt,
@ScalarCacheQuerySqlField field1: String,
@ScalarCacheQuerySqlField field2: String,
@ScalarCacheQuerySqlField field3: String,
@ScalarCacheQuerySqlField field4: Int,
@ScalarCacheQuerySqlField field5: Int,
@ScalarCacheQuerySqlField field6: Int,
@ScalarCacheQuerySqlField field7: Int,
@ScalarCacheQuerySqlField field8: Int,
@ScalarCacheQuerySqlField field9: String,
@ScalarCacheQuerySqlField field10: String,
@ScalarCacheQuerySqlField field11: Int,
@ScalarCacheQuerySqlField field12: String,
@ScalarCacheQuerySqlField field13: String,
@ScalarCacheQuerySqlField field14: String,
@ScalarCacheQuerySqlField field15: Int,
@ScalarCacheQuerySqlField field16: String,
@ScalarCacheQuerySqlField field17: Int,
@ScalarCacheQuerySqlField field18: String,
@ScalarCacheQuerySqlField field19: String,
@ScalarCacheQuerySqlField field20: String,
@ScalarCacheQuerySqlField field21: Int,
@ScalarCacheQuerySqlField field22: Int,
@ScalarCacheQuerySqlField field23: String,
@ScalarCacheQuerySqlField field24: Int,
@ScalarCacheQuerySqlField field25: Int,
@ScalarCacheQuerySqlField field26: Int,
@ScalarCacheQuerySqlField field27: Int,
@ScalarCacheQuerySqlField field28: Int,
@ScalarCacheQuerySqlField field29: BigInt,
@ScalarCacheQuerySqlField field30: String,
@ScalarCacheQuerySqlField field31: BigInt,
@ScalarCacheQuerySqlField field32: String,
@ScalarCacheQuerySqlField field33: BigInt,
@ScalarCacheQuerySqlField field34: String,
@ScalarCacheQuerySqlField field35: BigInt,
@ScalarCacheQuerySqlField field36: String,
@ScalarCacheQuerySqlField field37: Int,
@ScalarCacheQuerySqlField field38: Int,
@ScalarCacheQuerySqlField field39: Int,
@ScalarCacheQuerySqlField field40: BigInt)extends
Serializable {
}
-----------------------------------------------------------------------------------------------------
and the xml file is :
-----------------------------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<list>
<bean
class="org.apache.ignite.configuration.CacheConfiguration">
<property name="atomicityMode" value="ATOMIC"/>
<property name="backups" value="1"/>
</bean>
</list>
</property>
<property name="discoverySpi">
<bean
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
-----------------------------------------------------------------------------------------------------
the log:
-----------------------------------------------------------------------------------------------------
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 0.0 (TID 7, nobida144): class
org.apache.ignite.IgniteCheckedException: Spring XML configuration path is
invalid: /home/test/SparkIgniteStreaming/config/example-cache.xml. Note that
this path should be either absolute or a relative local file system path,
relative to META-INF in classpath or valid URL to IGNITE_HOME.
at
org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3523)
at
org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643)
at
org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682)
at
org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
at
org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184)
at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143)
at
org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171)
at
org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.MalformedURLException: no protocol:
/home/test/SparkIgniteStreaming/config/example-cache.xml
at java.net.URL.<init>(URL.java:589)
at java.net.URL.<init>(URL.java:486)
at java.net.URL.<init>(URL.java:435)
at
org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3514)
... 18 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.apache.ignite.spark.IgniteRDD.savePairs(IgniteRDD.scala:170)
at main.scala.StreamingJoin$.main(StreamingJoin.scala:242)
at main.scala.StreamingJoin.main(StreamingJoin.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: class org.apache.ignite.IgniteCheckedException: Spring XML
configuration path is invalid:
/home/test/SparkIgniteStreaming/config/example-cache.xml. Note that this
path should be either absolute or a relative local file system path,
relative to META-INF in classpath or valid URL to IGNITE_HOME.
at
org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3523)
at
org.apache.ignite.internal.IgnitionEx.loadConfigurations(IgnitionEx.java:643)
at
org.apache.ignite.internal.IgnitionEx.loadConfiguration(IgnitionEx.java:682)
at
org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
at
org.apache.ignite.spark.IgniteContext$$anonfun$$lessinit$greater$2.apply(IgniteContext.scala:88)
at org.apache.ignite.spark.Once.apply(IgniteContext.scala:184)
at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:143)
at
org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:171)
at
org.apache.ignite.spark.IgniteRDD$$anonfun$savePairs$1.apply(IgniteRDD.scala:170)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.MalformedURLException: no protocol:
/home/test/SparkIgniteStreaming/config/example-cache.xml
at java.net.URL.<init>(URL.java:589)
at java.net.URL.<init>(URL.java:486)
at java.net.URL.<init>(URL.java:435)
at
org.apache.ignite.internal.util.IgniteUtils.resolveSpringUrl(IgniteUtils.java:3514)
... 18 more
^C[18:09:32] Ignite node stopped OK [uptime=00:00:16:733]
-----------------------------------------------------------------------------------------------------
I modified the schema fields' name for some reason, other code are the same
with the one I executed.
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/How-to-solve-the-22-parameters-limit-under-scala-2-10-in-the-case-class-tp3847p3894.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.