Re: Cassandra Connection Issue with Spark-jobserver

2015-04-27 Thread Anand
I was able to fix the issues by providing right version of cassandra-all and
thrift libraries 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587p22664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cassandra Connection Issue with Spark-jobserver

2015-04-27 Thread Noorul Islam K M

Are you using DSE spark, if so are you pointing spark job server to use DSE 
spark?

Thanks and Regards
Noorul

Anand  writes:

> *I am new to Spark world and Job Server
>
> My Code :*
>
> package spark.jobserver
>
> import java.nio.ByteBuffer
>
> import scala.collection.JavaConversions._
> import scala.collection.mutable.ListBuffer
> import scala.collection.immutable.Map
>
> import org.apache.cassandra.hadoop.ConfigHelper
> import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
> import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
> import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
> import org.apache.cassandra.utils.ByteBufferUtil
> import org.apache.hadoop.mapreduce.Job
>
> import com.typesafe.config.{Config, ConfigFactory}
> import org.apache.spark._
> import org.apache.spark.SparkContext._
> import scala.util.Try
>
> object CassandraCQLTest extends SparkJob{
>
>   def main(args: Array[String]) {   
> val sc = new SparkContext("local[4]", "CassandraCQLTest")
>
> sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar");
> val config = ConfigFactory.parseString("")
> val results = runJob(sc, config)
> println("Result is " + "test")
>   }
>   
>   override def validate(sc: SparkContext, config: Config):
> SparkJobValidation = {
> Try(config.getString("input.string"))
>   .map(x => SparkJobValid)
>   .getOrElse(SparkJobInvalid("No input.string config param"))
>   }
>   
>   override def runJob(sc: SparkContext, config: Config): Any = {
> val cHost: String = "localhost"
> val cPort: String = "9160"
> val KeySpace = "retail"
> val InputColumnFamily = "ordercf"
> val OutputColumnFamily = "salecount"
>
> val job = new Job()
> job.setInputFormatClass(classOf[CqlPagingInputFormat])
> ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
> ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
> ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace,
> InputColumnFamily)
> ConfigHelper.setInputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner")
> CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
>
> /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),
> "user_id='bob'") */
>
> /** An UPDATE writes one or more columns to a record in a Cassandra
> column family */
> val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET
> sale_count = ? "
> CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
>
> job.setOutputFormatClass(classOf[CqlOutputFormat])
> ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace,
> OutputColumnFamily)
> ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
> ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
> ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> "Murmur3Partitioner")
>
> val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
>   classOf[CqlPagingInputFormat],
>   classOf[java.util.Map[String,ByteBuffer]],
>   classOf[java.util.Map[String,ByteBuffer]])
>
> 
> val productSaleRDD = casRdd.map {
>   case (key, value) => {
> (ByteBufferUtil.string(value.get("prod_id")),
> ByteBufferUtil.toInt(value.get("quantity")))
>   }
> }
> val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
> aggregatedRDD.collect().foreach {
>   case (productId, saleCount) => println(productId + ":" + saleCount)
> }
>
> val casoutputCF  = aggregatedRDD.map {
>   case (productId, saleCount) => {
> val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
> val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
> var outColFamVal = new ListBuffer[ByteBuffer]
> outColFamVal += ByteBufferUtil.bytes(saleCount)
> val outVal: java.util.List[ByteBuffer] = outColFamVal
>(outKey, outVal)
>   }
> }
>
> casoutputCF.saveAsNewAPIHadoopFile(
> KeySpace,
> classOf[java.util.Map[String, ByteBuffer]],
> classOf[java.util.List[ByteBuffer]],
> classOf[CqlOutputFormat],
> job.getConfiguration()
>   )
> casRdd.count
>   }
> }
>
> *When I push the Jar using spark-jobServer and execute it I get this on
> spark-jobserver terminal
> *
> job-server[ERROR] Exception in thread "pool-1-thread-1"
> java.lang.NoClassDefFoundError:
> org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
> job-server[ERROR] at
> spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46)
> job-server[ERROR] at
> spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21)
> job-server[ERROR] at
> spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
> job-server[ERROR] at
> scala.concurrent.impl.Future$Promi