So there is really no point in using it :( Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 23 April 2016 at 00:11, Ted Yu <yuzhih...@gmail.com> wrote: > The class is private : > > final class OffsetRange private( > > On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> wrote: > >> Ok I decided to forgo that approach and use an existing program of mine >> with slight modification. The code is this >> >> import org.apache.spark.SparkContext >> import org.apache.spark.SparkConf >> import org.apache.spark.sql.Row >> import org.apache.spark.sql.hive.HiveContext >> import org.apache.spark.sql.types._ >> import org.apache.spark.sql.SQLContext >> import org.apache.spark.sql.functions._ >> import _root_.kafka.serializer.StringDecoder >> import org.apache.spark.streaming._ >> import org.apache.spark.streaming.kafka.KafkaUtils >> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange} >> // >> object CEP_assembly { >> def main(args: Array[String]) { >> val conf = new SparkConf(). >> setAppName("CEP_assembly"). >> setMaster("local[2]"). >> set("spark.driver.allowMultipleContexts", "true"). >> set("spark.hadoop.validateOutputSpecs", "false") >> val sc = new SparkContext(conf) >> // Create sqlContext based on HiveContext >> val sqlContext = new HiveContext(sc) >> import sqlContext.implicits._ >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> println ("\nStarted at"); sqlContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> val ssc = new StreamingContext(conf, Seconds(1)) >> ssc.checkpoint("checkpoint") >> val kafkaParams = Map[String, String]("bootstrap.servers" -> >> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >> val topics = Set("newtopic", "newtopic") >> val dstream = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topics) >> dstream.cache() >> val lines = dstream.map(_._2) >> val showResults = lines.filter(_.contains("statement >> cache")).flatMap(line => line.split("\n,")).map(word => (word, >> 1)).reduceByKey(_ + _) >> // Define the offset ranges to read in the batch job >> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220) >> // Create the RDD based on the offset ranges >> val rdd = KafkaUtils.createRDD[String, String, StringDecoder, >> StringDecoder](sc, kafkaParams, offsetRanges) >> ssc.start() >> ssc.awaitTermination() >> //ssc.stop() >> println ("\nFinished at"); sqlContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> } >> } >> >> >> With sbt >> >> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" % >> "provided" >> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1" % >> "provided" >> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" % >> "provided" >> libraryDependencies += "junit" % "junit" % "4.12" >> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0" >> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" >> % "provided" >> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % >> "1.6.1" >> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6" >> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" >> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" >> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" % >> "test" >> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % >> "1.6.1" >> >> >> However, I an getting the following error >> >> [info] Loading project definition from >> /data6/hduser/scala/CEP_assembly/project >> [info] Set current project to CEP_assembly (in build >> file:/data6/hduser/scala/CEP_assembly/) >> [info] Compiling 1 Scala source to >> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes... >> [error] >> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37: >> constructor OffsetRange in class OffsetRange cannot be accessed in object >> CEP_assembly >> [error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220) >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 22 April 2016 at 18:41, Marcelo Vanzin <van...@cloudera.com> wrote: >> >>> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh >>> <mich.talebza...@gmail.com> wrote: >>> > I am trying to test Spark with CEP and I have been shown a sample here >>> > >>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532 >>> >>> I'm not familiar with CEP, but that's a Spark unit test, so if you're >>> trying to run it outside of the context of Spark unit tests (as it >>> seems you're trying to do), you're going to run into a world of >>> trouble. I'd suggest a different approach where whatever you're trying >>> to do is done through the Spark build, not outside of it. >>> >>> -- >>> Marcelo >>> >> >> >