Re: How to load a big csv to dataframe in Spark 1.6
Hi Raymond, Your problem is to pass those 100 fields to .toDF() method?? Sent from my Samsung device Original message From: Raymond Xie Date: 31/12/2016 10:46 (GMT+08:00) To: user@spark.apache.org Subject: How to load a big csv to dataframe in Spark 1.6 Hello, I see there is usually this way to load a csv to dataframe: sqlContext = SQLContext(sc) Employee_rdd = sc.textFile("\..\Employee.csv") .map(lambda line: line.split(",")) Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name']) Employee_df.show()However in my case my csv has 100+ fields, which means toDF() will be very lengthy. Can anyone tell me a practical method to load the data? Thank you very much. Raymond
Unsubscribe
-- Kranthi PS: Sent from mobile, pls excuse the brevity and typos. > On Dec 7, 2016, at 8:05 PM, Siddhartha Khaitan > wrote: > >
Re: Running spark from Eclipse and then Jar
Don't you need to provide your class name "JavaWordCount"? Thanks, Vasu. > On Dec 7, 2016, at 3:18 PM, im281 wrote: > > Hello, > I have a simple word count example in Java and I can run this in Eclipse > (code at the bottom) > > I then create a jar file from it and try to run it from the cmd > > > java -jar C:\Users\Owner\Desktop\wordcount.jar Data/testfile.txt > > But I get this error? > > I think the main error is: > *Exception in thread "main" java.lang.ClassNotFoundException: Failed to find > data source: text* > > Any advise on how to run this jar file in spark would be appreciated > > > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 16/12/07 15:16:41 INFO SparkContext: Running Spark version 2.0.2 > 16/12/07 15:16:42 INFO SecurityManager: Changing view acls to: Owner > 16/12/07 15:16:42 INFO SecurityManager: Changing modify acls to: Owner > 16/12/07 15:16:42 INFO SecurityManager: Changing view acls groups to: > 16/12/07 15:16:42 INFO SecurityManager: Changing modify acls groups to: > 16/12/07 15:16:42 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(Owner); groups > with view permissions: Set(); users with modify permissions: Set(Owner); > groups with modify permissions: Set() > 16/12/07 15:16:44 INFO Utils: Successfully started service 'sparkDriver' on > port 10211. > 16/12/07 15:16:44 INFO SparkEnv: Registering MapOutputTracker > 16/12/07 15:16:44 INFO SparkEnv: Registering BlockManagerMaster > 16/12/07 15:16:44 INFO DiskBlockManager: Created local directory at > C:\Users\Owner\AppData\Local\Temp\blockmgr-b4b1960b-08fc-44fd-a75e-1a0450556873 > 16/12/07 15:16:44 INFO MemoryStore: MemoryStore started with capacity 1984.5 > MB > 16/12/07 15:16:45 INFO SparkEnv: Registering OutputCommitCoordinator > 16/12/07 15:16:45 INFO Utils: Successfully started service 'SparkUI' on port > 4040. > 16/12/07 15:16:45 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at > http://192.168.19.2:4040 > 16/12/07 15:16:45 INFO Executor: Starting executor ID driver on host > localhost > 16/12/07 15:16:45 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 10252. > 16/12/07 15:16:45 INFO NettyBlockTransferService: Server created on > 192.168.19.2:10252 > 16/12/07 15:16:45 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(driver, 192.168.19.2, 10252) > 16/12/07 15:16:45 INFO BlockManagerMasterEndpoint: Registering block manager > 192.168.19.2:10252 with 1984.5 MB RAM, BlockManagerId(driver, 192.168.19.2, > 10252) > 16/12/07 15:16:45 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(driver, 192.168.19.2, 10252) > 16/12/07 15:16:46 WARN SparkContext: Use an existing SparkContext, some > configuration may not take effect. > 16/12/07 15:16:46 INFO SharedState: Warehouse path is > 'file:/C:/Users/Owner/spark-warehouse'. > Exception in thread "main" java.lang.ClassNotFoundException: Failed to find > data source: text. Please find packages at > https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects >at > org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) >at > org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) >at > org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) >at > org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340) >at > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149) >at > org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:504) >at > org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:540) >at > org.apache.spark.sql.DataFrameReader.textFile(DataFrameReader.scala:513) >at JavaWordCount.main(JavaWordCount.java:57) > Caused by: java.lang.ClassNotFoundException: text.DefaultSource >at java.net.URLClassLoader.findClass(Unknown Source) >at java.lang.ClassLoader.loadClass(Unknown Source) >at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) >at java.lang.ClassLoader.loadClass(Unknown Source) >at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) >at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) >at scala.util.Try$.apply(Try.scala:192) >at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) >at > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132) >at scala.util.Try.orElse(Try.scala:84) >at > org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132) >... 8 more > 16/12/07 15:16
Re: Third party library
Maybe you've already checked these out. Some basic questions that come to my mind are: 1) is this library "foolib" or "foo-C-library" available on the worker node? 2) if yes, is it accessible by the user/program (rwx)? Thanks, Vasu. > On Nov 26, 2016, at 5:08 PM, kant kodali wrote: > > If it is working for standalone program I would think you can apply the same > settings across all the spark worker and client machines and give that a > try. Lets start with that. > >> On Sat, Nov 26, 2016 at 11:59 AM, vineet chadha >> wrote: >> Just subscribed to Spark User. So, forwarding message again. >> >>> On Sat, Nov 26, 2016 at 11:50 AM, vineet chadha >>> wrote: >>> Thanks Kant. Can you give me a sample program which allows me to call jni >>> from executor task ? I have jni working in standalone program in >>> scala/java. >>> >>> Regards, >>> Vineet >>> On Sat, Nov 26, 2016 at 11:43 AM, kant kodali wrote: Yes this is a Java JNI question. Nothing to do with Spark really. java.lang.UnsatisfiedLinkError typically would mean the way you setup LD_LIBRARY_PATH is wrong unless you tell us that it is working for other cases but not this one. > On Sat, Nov 26, 2016 at 11:23 AM, Reynold Xin wrote: > That's just standard JNI and has nothing to do with Spark, does it? > > >> On Sat, Nov 26, 2016 at 11:19 AM, vineet chadha >> wrote: >> Thanks Reynold for quick reply. >> >> I have tried following: >> >> class MySimpleApp { >> // ---Native methods >> @native def fooMethod (foo: String): String >> } >> >> object MySimpleApp { >> val flag = false >> def loadResources() { >> System.loadLibrary("foo-C-library") >> val flag = true >> } >> def main() { >> sc.parallelize(1 to 10).mapPartitions ( iter => { >> if(flag == false){ >> MySimpleApp.loadResources() >>val SimpleInstance = new MySimpleApp >> } >> SimpleInstance.fooMethod ("fooString") >> iter >> }) >> } >> } >> >> I don't see way to invoke fooMethod which is implemented in >> foo-C-library. Is I am missing something ? If possible, can you point me >> to existing implementation which i can refer to. >> >> Thanks again. >> ~ >> >> >>> On Fri, Nov 25, 2016 at 3:32 PM, Reynold Xin >>> wrote: >>> bcc dev@ and add user@ >>> >>> >>> This is more a user@ list question rather than a dev@ list question. >>> You can do something like this: >>> >>> object MySimpleApp { >>> def loadResources(): Unit = // define some idempotent way to load >>> resources, e.g. with a flag or lazy val >>> >>> def main() = { >>> ... >>> >>> sc.parallelize(1 to 10).mapPartitions { iter => >>> MySimpleApp.loadResources() >>> >>> // do whatever you want with the iterator >>> } >>> } >>> } >>> >>> >>> >>> >>> On Fri, Nov 25, 2016 at 2:33 PM, vineet chadha wrote: Hi, I am trying to invoke C library from the Spark Stack using JNI interface (here is sample application code) class SimpleApp { // ---Native methods @native def foo (Top: String): String } object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SimpleApplication").set("SPARK_LIBRARY_PATH", "lib") val sc = new SparkContext(conf) System.loadLibrary("foolib") //instantiate the class val SimpleAppInstance = new SimpleApp //String passing - Working val ret = SimpleAppInstance.foo("fooString") } Above code work fines. I have setup LD_LIBRARY_PATH and spark.executor.extraClassPath, spark.executor.extraLibraryPath at worker node How can i invoke JNI library from worker node ? Where should i load it in executor ? Calling System.loadLibrary("foolib") inside the work node gives me following error : Exception in thread "main" java.lang.UnsatisfiedLinkError: Any help would be really appreciated. >>> >> > >>> >> >
Re: aggregateByKey on PairRDD
Hi, We can use CombineByKey to achieve this. val finalRDD = tempRDD.combineByKey((x: (Any, Any)) => (x),(acc: (Any, Any), x) => (acc, x),(acc1: (Any, Any), acc2: (Any, Any)) => (acc1, acc2)) finalRDD.collect.foreach(println) (amazon,((book1, tech),(book2,tech)))(barns&noble, (book,tech))(eBay, (book1,tech)) Thanks,Sivakumar Original message From: Daniel Haviv Date: 30/03/2016 18:58 (GMT+08:00) To: Akhil Das Cc: Suniti Singh , user@spark.apache.org, dev Subject: Re: aggregateByKey on PairRDD Hi,shouldn't groupByKey be avoided (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) ? Thank you,.Daniel On Wed, Mar 30, 2016 at 9:01 AM, Akhil Das wrote: Isn't it what tempRDD.groupByKey does? ThanksBest Regards On Wed, Mar 30, 2016 at 7:36 AM, Suniti Singh wrote: Hi All, I have an RDD having the data in the following form : tempRDD: RDD[(String, (String, String))](brand , (product, key))("amazon",("book1","tech"))("eBay",("book1","tech")) ("barns&noble",("book","tech")) ("amazon",("book2","tech")) I would like to group the data by Brand and would like to get the result set in the following format :resultSetRDD : RDD[(String, List[(String), (String)]i tried using the aggregateByKey but kind of not getting how to achieve this. OR is there any other way to achieve this? val resultSetRDD = tempRDD.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value) + ","}, (aggr1, aggr2) => aggr1 + aggr2)resultSetRDD = (amazon,("book1","tech"),("book2","tech"))Thanks,Suniti
Re: java.lang.ClassNotFoundException: org.apache.spark.streaming.twitter.TwitterReceiver
Try to reproduce what the spark-submit shell script does, setting up the class path etc. Sent from my rotary phone. > On Nov 9, 2015, at 7:07 AM, Tathagata Das wrote: > > You cannot submit from eclipse to a cluster that easily. You can run locally > (master set to local...), and it should work with just the pom. > >> On Mon, Nov 9, 2015 at 2:49 AM, أنس الليثي wrote: >> If I packaged the application and submit it, it works fine but I need to run >> it from eclipse. >> >> Is there any problem running the application from eclipse ? >> >> >> >>> On 9 November 2015 at 12:27, Tathagata Das wrote: >>> How are you submitting the spark application? >>> You are supposed to submit the fat-jar of the application that include the >>> spark-streaming-twitter dependency (and its subdeps) but not >>> spark-streaming and spark-core. >>> On Mon, Nov 9, 2015 at 1:02 AM, أنس الليثي wrote: I tried to remove maven and adding the dependencies manually using build path > configure build path > add external jars, then adding the jars manually but it did not work. I tried to create another project and copied the code from the first app but the problem still the same. I event tried to change eclipse with another version, but the same problem exist. :( :( :( :( > On 9 November 2015 at 10:47, أنس الليثي wrote: > I tried both, the same exception still thrown > >> On 9 November 2015 at 10:45, Sean Owen wrote: >> You included a very old version of the Twitter jar - 1.0.0. Did you mean >> 1.5.1? >> >> On Mon, Nov 9, 2015 at 7:36 AM, fanooos wrote: >> > This is my first Spark Stream application. The setup is as following >> > >> > 3 nodes running a spark cluster. One master node and two slaves. >> > >> > The application is a simple java application streaming from twitter and >> > dependencies managed by maven. >> > >> > Here is the code of the application >> > >> > public class SimpleApp { >> > >> > public static void main(String[] args) { >> > >> > SparkConf conf = new SparkConf().setAppName("Simple >> > Application").setMaster("spark://rethink-node01:7077"); >> > >> > JavaStreamingContext sc = new JavaStreamingContext(conf, new >> > Duration(1000)); >> > >> > ConfigurationBuilder cb = new ConfigurationBuilder(); >> > >> > cb.setDebugEnabled(true).setOAuthConsumerKey("ConsumerKey") >> > .setOAuthConsumerSecret("ConsumerSecret") >> > .setOAuthAccessToken("AccessToken") >> > .setOAuthAccessTokenSecret("TokenSecret"); >> > >> > OAuthAuthorization auth = new OAuthAuthorization(cb.build()); >> > >> > JavaDStream tweets = TwitterUtils.createStream(sc, >> > auth); >> > >> > JavaDStream statuses = tweets.map(new Function> > String>() { >> > public String call(Status status) throws Exception { >> > return status.getText(); >> > } >> > }); >> > >> > statuses.print();; >> > >> > sc.start(); >> > >> > sc.awaitTermination(); >> > >> > } >> > >> > } >> > >> > >> > here is the pom file >> > >> > http://maven.apache.org/POM/4.0.0"; >> > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; >> > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> > http://maven.apache.org/xsd/maven-4.0.0.xsd";> >> > 4.0.0 >> > SparkFirstTry >> > SparkFirstTry >> > 0.0.1-SNAPSHOT >> > >> > >> > >> > org.apache.spark >> > spark-core_2.10 >> > 1.5.1 >> > provided >> > >> > >> > >> > org.apache.spark >> > spark-streaming_2.10 >> > 1.5.1 >> > provided >> > >> > >> > >> > org.twitter4j >> > twitter4j-stream >> > 3.0.3 >> > >> > >> > org.apache.spark >> > spark-streaming-twitter_2.10 >> > 1.0.0 >> > >> > >> > >> > >> > >> > src >> > >> > >> > maven-compiler-plugin >> > 3.3 >> > >> > 1.8 >> > 1.8 >> > >> > >> > >> > maven-assembly-plugin >> > >> > >> > >> > >> > com.test.sparkTest.SimpleApp >> >
Re: Does feature parity exist between Scala and Python on Spark
While I have a preference for Scala ( not surprising as a Typesafe person), the DataFrame API gives feature and performance parity for Python. The RDD API gives feature parity. So, use what makes you most successful for other reasons ;) Sent from my rotary phone. > On Oct 6, 2015, at 4:14 PM, dant wrote: > > Hi, > I'm hearing a common theme running that I should only do serious programming > in Scala on Spark (1.5.1). Real power users use Scala. It is said that > Python is great for analytics but in the end the code should be written to > Scala to finalise. There are a number of reasons I'm hearing: > > 1. Spark is written in Scala so will always be faster than any other > language implementation on top of it. > 2. Spark releases always favour more features being visible and enabled for > Scala API than Python API. > > Are there any truth's to the above? I'm a little sceptical. > > Thanks > Dan > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Does-feature-parity-exist-between-Scala-and-Python-on-Spark-tp24961.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to list all dataframes and RDDs available in current session?
Okay but "how?" thats what I am trying to figure out 😀? Any command you would suggest? "Sent from my iPhone, plaese excuse any typos :)" > On Aug 21, 2015, at 11:45 PM, Raghavendra Pandey > wrote: > > You get the list of all the persistet rdd using spark context... >> On Aug 21, 2015 12:06 AM, "Rishitesh Mishra" >> wrote: >> I am not sure if you can view all RDDs in a session. Tables are maintained >> in a catalogue . Hence its easier. However you can see the DAG >> representation , which lists all the RDDs in a job , with Spark UI. >> >>> On 20 Aug 2015 22:34, "Dhaval Patel" wrote: >>> Apologies >>> >>> I accidentally included Spark User DL on BCC. The actual email message is >>> below. >>> = >>> >>> >>> Hi: >>> >>> I have been working on few example using zeppelin. >>> >>> I have been trying to find a command that would list all *dataframes/RDDs* >>> that has been created in current session. Anyone knows if there is any such >>> commands available? >>> >>> Something similar to SparkSQL to list all temp tables : >>> show tables; >>> >>> Thanks, >>> Dhaval >>> >>> >>> On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel wrote: Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval
Re: correct Scala Imports for creating DFs from RDDs?
You are mixing the 1.0.0 Spark SQL jar with Spark 1.4.0 jars in your build file Sent from my rotary phone. > On Jul 14, 2015, at 7:57 AM, ashwang168 wrote: > > Hello! > > I am currently using Spark 1.4.0, scala 2.10.4, and sbt 0.13.8 to try and > create a jar file from a scala file (attached above) and run it using > spark-submit. I am also using Hive, Hadoop 2.6.0-cdh5.4.0 which has the > files that I'm trying to read in. > > Currently I am very confused about how the imports work and if there . I am > getting the error: > > > [error] bad symbolic reference. A signature in SQLContext.class refers to > term package > [error] in package org.apache.spark.sql which is not available. > [error] It may be completely missing from the current classpath, or the > version on > [error] the classpath might be incompatible with the version used when > compiling SQLContext.class. > [error] bad symbolic reference. A signature in SQLContext.class refers to > type Logging > [error] in value org.apache.spark.sql.package which is not available. > [error] It may be completely missing from the current classpath, or the > version on > [error] the classpath might be incompatible with the version used when > compiling SQLContext.class. > [error] bad symbolic reference. A signature in SchemaRDD.class refers to > term package > [error] in package org.apache.spark.sql which is not available. > [error] It may be completely missing from the current classpath, or the > version on > [error] the classpath might be incompatible with the version used when > compiling SchemaRDD.class. > [error] /root/awang/time/rddSpark/create/src/main/scala/create.scala:20: > value implicits is not a member of org.apache.spark.sql.SQLContext > ... > > [error] /root/awang/time/rddSpark/create/src/main/scala/create.scala:39: > value toDF is not a member of org.apache.spark.rdd.RDD[TSTData] > > > The imports in my code are: > import org.apache.spark._ > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > import org.apache.spark.sql > > import org.apache.spark.sql._ > > and in the object Create : > val sqlContext = new org.apache.spark.sql.SQLContext(sc) >import sqlContext._ >import sqlContext.implicits._ >import sqlContext.createSchemaRDD > > > My libraryDependencies are: > libraryDependencies ++= Seq( >// spark will already be on classpath when using spark-submit. >// marked as provided, so that it isn't included in assembly. >"org.apache.spark" %% "spark-catalyst" % "1.4.0" % "provided", > >"org.apache.spark" %% "spark-sql" % "1.0.0") > > so why is "package org.apache.spark.sql not available"? > > Also, what are the correct imports to get this working? > > I'm using sbt assembly to try to compile these files, and would really > appreciate any help. > > Thanks, > Ashley Wang > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/correct-Scala-Imports-for-creating-DFs-from-RDDs-tp23829.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD staleness
There is no mechanism for keeping an RDD up to date with a changing source. However you could set up a steam that watches for changes to the directory and processes the new files or use the Hive integration in SparkSQL to run Hive queries directly. (However, old query results will still grow stale. ) Sent from my rotary phone. > On May 31, 2015, at 7:11 AM, Ashish Mukherjee > wrote: > > Hello, > > Since RDDs are created from data from Hive tables or HDFS, how do we ensure > they are invalidated when the source data is updated? > > Regards, > Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming
Show us the code. This shouldn't happen for the simple process you described Sent from my rotary phone. > On Mar 27, 2015, at 5:47 AM, jamborta wrote: > > Hi all, > > We have a workflow that pulls in data from csv files, then originally setup > up of the workflow was to parse the data as it comes in (turn into array), > then store it. This resulted in out of memory errors with larger files (as a > result of increased GC?). > > It turns out if the data gets stored as a string first, then parsed, it > issues does not occur. > > Why is that? > > Thanks, > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming alerting
Akhil You are right in tour answer to what Mohit wrote. However what Mohit seems to be alluring but did not write properly might be different. Mohit You are wrong in saying "generally" streaming works in HDFS and cassandra . Streaming typically works with streaming or queing source like Kafka, kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , streaming context ( "receiver" wishing the streaming context ) gets events/messages/records and forms a time window based batch (RDD)- So there is a maximum gap of window time from alert message was available to spark and when the processing happens. I think you meant about this. As per spark programming model, RDD is the right way to deal with data. If you are fine with the minimum delay of say a sec (based on min time window that dstreaming can support) then what Rohit gave is a right model. Khanderao > On Mar 22, 2015, at 11:39 PM, Akhil Das wrote: > > What do you mean you can't send it directly from spark workers? Here's a > simple approach which you could do: > > val data = ssc.textFileStream("sigmoid/") > val dist = data.filter(_.contains("ERROR")).foreachRDD(rdd => > alert("Errors :" + rdd.count())) > > And the alert() function could be anything triggering an email or sending an > SMS alert. > > Thanks > Best Regards > >> On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia >> wrote: >> Is there a module in spark streaming that lets you listen to the >> alerts/conditions as they happen in the streaming module? Generally spark >> streaming components will execute on large set of clusters like hdfs or >> Cassandra, however when it comes to alerting you generally can't send it >> directly from the spark workers, which means you need a way to listen to the >> alerts. >
Re: Spark + Kafka
I have used various version of spark (1.0, 1.2.1) without any issues . Though I have not significantly used kafka with 1.3.0 , a preliminary testing revealed no issues . - khanderao > On Mar 18, 2015, at 2:38 AM, James King wrote: > > Hi All, > > Which build of Spark is best when using Kafka? > > Regards > jk - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
When uses SparkFiles.get("GeoIP.dat"), got exception in thread "main" java.io.FileNotFoundException
Hi there, Spark version: 1.2 /home/hadoop/spark/bin/spark-submit --class com.litb.bi.CSLog2ES --master yarn --executor-memory 1G --jars /mnt/external/kafka/target/spark-streaming-kafka_2.10-1.2.0.jar,/mnt/external/kafka/target/zkclient-0.3.jar,/mnt/external/kafka/target/metrics-core-2.2.0.jar,/mnt/external/kafka/target/kafka_2.10-0.8.0.jar,elasticsearch-hadoop-2.1.0.Beta3.jar,geoip-api-1.2.13.jar --files /mnt/GeoIP.dat BILog-1.1-SNAPSHOT.jar 54.175.174.144 test test_ctrlitb 2 In my code, I want to use the GeoIP.dat to parse the IP of clickstream log. val Array(zkQuorum, group, topics, numThreads) = args val conf = new SparkConf().setAppName("Kafka CTRLog to ES") conf.set("spark.streaming.receiver.writeAheadLogs.enable", "true") conf.set("es.index.auto.create", "true") conf.set("es.nodes", "10.5.2.250") // conf.set("spark.serializer", classOf[KryoSerializer].getName) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) // geoip file on executor val geofile_path = SparkFiles.get("GeoIP.dat") val cl = new LookupService(geofile_path, LookupService.GEOIP_MEMORY_CACHE | LookupService.GEOIP_CHECK_CACHE) I got the the following execption: 2015-02-08 06:51:17,064 INFO [main] handler.ContextHandler (ContextHandler.java:startContext(737)) - started o.e.j.s.ServletContextHandler{/streaming,null} 2015-02-08 06:51:17,065 INFO [main] handler.ContextHandler (ContextHandler.java:startContext(737)) - started o.e.j.s.ServletContextHandler{/streaming/json,null} Exception in thread "main" java.io.FileNotFoundException: /tmp/spark-d85f0f21-2e66-4ed7-ae31-47564c8dfefd/GeoIP.dat (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:241) at com.maxmind.geoip.LookupService.init(LookupService.java:282) at com.maxmind.geoip.LookupService.init(LookupService.java:264) at com.litb.bi.CSLog2ES$.main(CSLog2ES.scala:51) at com.litb.bi.CSLog2ES.main(CSLog2ES.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Shen Zhun Data Mining at LightnInTheBox.com Email:shenzhunal...@gmail.com
Re: Confusing behavior of newAPIHadoopFile
Yes, I can implement like: sc.textFile(“path”).reduce(_ + _).split(“!!”).filter(x => x.trim.length > 0) But the reduce operation is expensive! I tested these two methods on a 6G file, the only operation with the created RDD is take(10).foreach(println), the method using newAPIHadoopFile only takes 2s while the code above will block for more than 1min because of the reduce I think. Would you post the code snippet to illustrate your idea? I didn't come up with an easy map, filter operation sequences on the RDD returned by textFile. Thanks! 常铖 cheng chang Computer Science Dept. Tsinghua Univ. Mobile Phone: 13681572414 WeChat ID: cccjcl 在 2014年7月28日 下午5:40:21, chang cheng (myai...@gmail.com) 写到: the value in (key, value) returned by textFile is exactly one line of the input. But what I want is the field between the two “!!”, hope this makes sense. - Senior in Tsinghua Univ. github: http://www.github.com/uronce-cc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10768.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Confusing behavior of newAPIHadoopFile
the value in (key, value) returned by textFile is exactly one line of the input. But what I want is the field between the two “!!”, hope this makes sense. 常铖 cheng chang Computer Science Dept. Tsinghua Univ. Mobile Phone: 13681572414 WeChat ID: cccjcl 在 2014年7月28日 下午5:05:06, Sean Owen (so...@cloudera.com) 写到: Shouldn't you be using the textFile() method? you are reading the file directly using TextInputFormat, and you get the raw (key,value) pairs back, which are indeed (line number,line) for TextInputFormat. Your second solution is fine if, for some reason, you need to use that method. On Mon, Jul 28, 2014 at 9:02 AM, chang cheng wrote: > Hi, all: > > I have a hadoop file containing fields seperated by "!!", like below: > !! > field1 > key1 value1 > key2 value2 > !! > field2 > key3 value3 > key4 value4 > !! > > I want to read the file into a pair in TextInputFormat, specifying delimiter > as "!!" > > First, I tried the following code: > > val hadoopConf = new Configuration() > hadoopConf.set("textinputformat.record.delimiter", "!!\n") > > val path = args(0) > val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat], > classOf[LongWritable], classOf[Text], hadoopConf) > > rdd.take(3).foreach(println) > > Far from expectation, the result is: > > (120,) > (120,) > (120,) > > According to my experimentation, "120" is the byte offset of the last field > separated by "!!" > > After digging into spark source code, I find "textFileInput" is implemented > as: > > hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], > classOf[Text], > minPartitions).map(pair => pair._2.toString).setName(path) > > So, I modified my initial code into: (bold text is the modification) > > val hadoopConf = new Configuration() > hadoopConf.set("textinputformat.record.delimiter", "!!\n") > > val path = args(0) > val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat], > classOf[LongWritable], classOf[Text], hadoopConf).*map(pair => > pair._2.toString)* > > rdd.take(3).foreach(println) > > Then, the results are: > > filed1 > key1 value1 > key2 value2 > > field2 > > As expected. > > I'm confused by the first code snippet's behavior. > Hope you can offer an explanation. Thanks! > > > > - > Senior in Tsinghua Univ. > github: http://www.github.com/uronce-cc > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com.