Twitter streaming with apache spark stream only a small amount of tweets
Hi, I've implemented Twitter streaming as in the code given at the bottom of email. It finds some tweets based on the hashtags I'm following. However, it seems that a large amount of tweets is missing. I've tried to post some tweets that I'm following in the application, and none of them was received in application. I also checked some hashtags (e.g. #android) on Twitter using Live and I could see that almost each second something was posted with that hashtag, and my application received only 3-4 posts in one minute. I didn't have this problem in earlier non-spark version of application which used twitter4j to access user stream API. I guess this is some trending stream, but I couldn't find anything that explains which Twitter API is used in Spark Twitter Streaming and how to create stream that will access everything posted on the Twitter. I hope somebody could explain what is the problem and how to solve this. Thanks, Zoran def initializeStreaming(){ val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream:DStream[Status] = TwitterUtils.createStream(ssc, auth) val filtered_statuses = stream.transform(rdd ={ val filtered = rdd.filter(status ={ var found = false for(tag - hashTagsList){ if(status.getText.toLowerCase.contains(tag)) { found = true } } found }) filtered }) filtered_statuses.foreachRDD(rdd = { rdd.collect.foreach(t = { println(t) }) }) ssc.start() }
Re: Parquet writing gets progressively slower
Thanks for the suggestion Cheng, I will try that today. Are there any implications when reading the parquet data if there are no summary files present? Michael On Sat, Jul 25, 2015 at 2:28 AM, Cheng Lian lian.cs@gmail.com wrote: The time is probably spent by ParquetOutputFormat.commitJob. While committing a successful write job, Parquet writes a pair of summary files, containing metadata like schema, user defined key-value metadata, and Parquet row group information. To gather all the necessary information, Parquet scans footers of all the data files within the base directory and tries to merge them. The more the data there are, the longer it takes. One possible workaround is to disable summary files by setting parquet.enable.summary-metadata to false in sc.hadoopConfiguration. Cheng On 7/25/15 4:15 AM, Michael Kelly wrote: Hi, We are converting some csv log files to parquet but the job is getting progressively slower the more files we add to the parquet folder. The parquet files are being written to s3, we are using a spark standalone cluster running on ec2 and the spark version is 1.4.1. The parquet files are partitioned on two columns, first the date, then another column. We write the data one day at a time, and the final size of the data for one day when it is written out to parquet is about 150GB. We coalesce the data before it is written out, and in total per day we have 615 partitions/files written out to s3. We use the SaveMode.Append since we are always writing to the same directory. This is the command we use to write the data. df.coalesce(partitions).write.mode(SaveMode.Append).partitionBy(dt,outcome).parquet(s3n://root/parquet/dir/) Writing the parquet file to an empty directory completes almost immediately, whereas after 12 days worth of data has been written, each parquet write takes up to 20 minutes (and there are 4 writes per day). Questions Is there a more efficient way to write the data? I'm guessing that the update to the parquet metadata is the issue, and that it happens in a serial fashion. Is there a way to write the metadata in the partitioned folders, and would this speed things up? Would this have any implications for reading in the data? I came across DirectParquetOutputCommitter, but the source for it says it cannot be used with Append mode, would this be useful? I came across this issue - https://issues.apache.org/jira/browse/SPARK-8125 and corresponding pull request - https://github.com/apache/spark/pull/7396, but it looks like they are more geared for reading parquet metadata in parallel as opposed to writing it. Is this the case? Any help would be much appreciated, Michael - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing binary files in Spark
Its been added from spark 1.1.0 i guess https://issues.apache.org/jira/browse/SPARK-1161 Thanks Best Regards On Sat, Jul 25, 2015 at 12:06 AM, Oren Shpigel o...@yowza3d.com wrote: Sorry, I didn't mention I'm using the Python API, which doesn't have the saveAsObjectFiles method. Is there any alternative from Python? And also, I want to write the raw bytes of my object into files on disk, and not using some Serialization format to be read back into Spark. Is it possible? Any alternatives for that? Thanks, Oren On Thu, Jul 23, 2015 at 8:04 PM Akhil Das ak...@sigmoidanalytics.com wrote: You can look into .saveAsObjectFiles Thanks Best Regards On Thu, Jul 23, 2015 at 8:44 PM, Oren Shpigel o...@yowza3d.com wrote: Hi, I use Spark to read binary files using SparkContext.binaryFiles(), and then do some calculations, processing, and manipulations to get new objects (also binary). The next thing I want to do is write the results back to binary files on disk. Is there any equivalence like saveAsTextFile just for binary files? Is there any other way to save the results to be used outside Spark? Thanks, Oren -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Writing-binary-files-in-Spark-tp23970.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 1.4.0 classpath issue with spark-submit
The thing is that the class it is complaining about is part of the spark assembly jar, not in my extra jar. The assembly jar was compiled with -Phive which is proven by the fact that it works with the same SPARK_HOME when run as shell. On 23 July 2015 at 17:33, Akhil Das ak...@sigmoidanalytics.com wrote: You can try adding that jar in SPARK_CLASSPATH (its deprecated though) in spark-env.sh file. Thanks Best Regards On Tue, Jul 21, 2015 at 7:34 PM, Michal Haris michal.ha...@visualdna.com wrote: I have a spark program that uses dataframes to query hive and I run it both as a spark-shell for exploration and I have a runner class that executes some tasks with spark-submit. I used to run against 1.4.0-SNAPSHOT. Since then 1.4.0 and 1.4.1 were released so I tried to switch to the official release. Now, when I run the program as a shell, everything works but when I try to run it with spark-submit it complains with this error: Exception in thread main java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/session/SessionState when creating Hive client using classpath: file:/home/mharis/dxp-spark.jar Please make sure that jars for your version of hive and hadoop are included in the paths passed to spark.sql.hive.metastore.jars. What is suspicious is firstly 'using classpath: ...' where the jar is my program, i.e. the paths that are passed along with --driver-class-path option are missing. When I switch to an older 1.4.0-SNAPSHOT on the driver, everything works. I observe the issue with 1.4.1. Are there any known obvious changes to how spark-submit handles configuration that I have missed ? -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033 31 Old Nichol Street London E2 7HR
Re: Insert data into a table
I don't think INSERT INTO is supported. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Insert-data-into-a-table-tp21898p23990.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best practice for transforming and storing from Spark to Mongo/HDFS
Hello, I am new user of Spark, and need to know what could be the best practice to do the following scenario : - Spark Streaming receives XML messages from Kafka - Spark transforms each message of the RDD (xml2json + some enrichments) - Spark store the transformed/enriched messages inside MongoDB and HDFS (Mongo Key as file name) Basically, I would say that I have to manage message one by one inside a foreach loop of the RDD and write each message one by one in MongoDB and HDFS. Do you think it is the best way to dot it ? Tks Nicolas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-dataflow + Spark Streaming + Kafka
Hello all New Spark user here. We've been looking at the Spark ecosystem to build some new parts of our log processing pipeline. The spark-dataflow project looks especially interesting. The windowing and triggers concepts look like a good fit for what we need to do: our log data going into Kafka is only in approximate time order and some events are sometimes delayed for quite some time. https://cloud.google.com/dataflow/model/windowing https://cloud.google.com/dataflow/model/triggers A few questions: 0. Is this a good forum for questions about spark-dataflow? 1. Is anybody using spark-dataflow for serious projects running outside of Google Cloud? How's it going with 0.2.3? Do windowing and triggers work? 2. Is anybody looking at adding support for Spark Streaming to spark-dataflow? It looks like SparkPipelineRunner and other parts would need to be extended to understand about StreamingContext. 3. Are there good alternatives to spark-dataflow that should be considered? 4. Should we be looking at rolling our own windowing + triggers setup directly on top of Spark/Spark Streaming instead of trying to use spark-dataflow? 5. If number 4 sounds like an option, is there any code out there that is doing this already that we can look at for some inspiration? Any advice appreciated. Thanks Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best practice for transforming and storing from Spark to Mongo/HDFS
Use foreachPartition and batch the writes On Sat, Jul 25, 2015 at 9:14 AM, nib...@free.fr wrote: Hello, I am new user of Spark, and need to know what could be the best practice to do the following scenario : - Spark Streaming receives XML messages from Kafka - Spark transforms each message of the RDD (xml2json + some enrichments) - Spark store the transformed/enriched messages inside MongoDB and HDFS (Mongo Key as file name) Basically, I would say that I have to manage message one by one inside a foreach loop of the RDD and write each message one by one in MongoDB and HDFS. Do you think it is the best way to dot it ? Tks Nicolas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parallelism of Custom receiver in spark
1 - How to increase the level of *parallelism in spark streaming custom RECEIVER* . 2 - Will ssc.receiverstream(/**anything //) will *delete the data stored in spark memory using store(s) * logic . -- Thanks Regards, Anshu Shukla
[Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files
Hello Spark community, I currently have a Spark 1.3.1 batch driver, deployed in YARN-cluster mode on an EMR cluster (AMI 3.7.0) that reads input data through an HiveContext, in particular SELECTing data from an EXTERNAL TABLE backed on S3. Such table has dynamic partitions and contains *hundreds of small GZip files*. Considering at the moment unfeasible to collate such files on the source side, I experience that, by default, the SELECT query is mapped by Spark into as much tasks as many files are found in the table root path(+partitions), e.g. 860 files === 860 tasks to complete the Spark stage of that read operation. This behaviour obviously creates an incredible overhead and, often, in failed stages due to OOM exceptions and subsequent crashes of the executors. Regardless the size of the input that I can manage to handle, I would really appreciate if you could suggest how to collate somehow the input partitions while reading, or, at least, reduce the number of tasks spawned by the Hive query. Looking at http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-differences.html#emr-hive-gzip-splits I tried by setting: hiveContext.sql(set hive.hadoop.supports.splittable.combineinputformat=true) before creating the external table to read from and query it, but it resulted in NO changes. Tried also to set that in the hive-site.xml on the cluster, but I experienced the same behaviour. Thanks to whomever will give me any hints. Best regards, Roberto
Multiple operations on same DStream in Spark Streaming
Hi I'm working with Spark Streaming using scala, and trying to figure out the following problem. In my DStream[(int, int)], each record is an int pair tuple. For each batch, I would like to filter out all records with first integer below average of first integer in this batch, and for all records with first integer above average of first integer in the batch, compute the average of second integers in such records. What's the best practice to implement this? I tried this but kept getting the object not serializable exception because it's hard to share variables (such as average of first int in the batch) between workers and driver. Any suggestions? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-operations-on-same-DStream-in-Spark-Streaming-tp23995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Download Apache Spark on Windows 7 for a Proof of Concept installation
I just wanted an easy step by step guide as to exactly what version of what ever to download for a Proof of Concept installation of Apache Spark on Windows 7. I have spent quite some time following a number of different recipes to no avail. I have tried about 10 different permutations to date. I prefer the easiest approach, e.g. download Pre-build Version of ... etc -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Download-Apache-Spark-on-Windows-7-for-a-Proof-of-Concept-installation-tp23992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question abt serialization
Hi I have been using Spark for quite some time using either scala or python. I wanted to give a try to groovy through scripts for small tests. Unfortunately I get the following exception (using that simple script https://gist.github.com/galleon/d6540327c418aa8a479f) Is there anything I am not doing correctly here. Thanks tog Groovy4Spark $ groovy GroovySparkWordcount.groovy class org.apache.spark.api.java.JavaRDD true true Caught: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311) at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.filter(RDD.scala:310) at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) at org.apache.spark.api.java.JavaRDD$filter$0.call(Unknown Source) at GroovySparkWordcount.run(GroovySparkWordcount.groovy:27) Caused by: java.io.NotSerializableException: GroovySparkWordcount Serialization stack: - object not serializable (class: GroovySparkWordcount, value: GroovySparkWordcount@7eee6c13) - field (class: GroovySparkWordcount$1, name: this$0, type: class GroovySparkWordcount) - object (class GroovySparkWordcount$1, GroovySparkWordcount$1@15c16f19) - field (class: org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, name: f$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaRDD$$anonfun$filter$1, function1) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 12 more -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net
ReceiverStream SPARK not able to cope up with 20,000 events /sec .
My eventGen is emitting 20,000 events/sec ,and I am using store(s1) in receive() method to push data to receiverStream . But this logic is working fine for upto 4000 events/sec and no batch are seen emitting for larger rate . *CODE:TOPOLOGY -* *JavaDStreamString sourcestream = ssc.receiverStream(new TetcCustomEventReceiver(datafilename,spoutlog,argumentClass.getScalingFactor(),datasetType));* *CODE:TetcCustomEventReceiver -* public void receive(ListString event) { StringBuffer tuple=new StringBuffer(); msgId++; for(String s:event) { tuple.append(s).append(,); } String s1=MsgIdAddandRemove.addMessageId(tuple.toString(),msgId); store(s1); } -- Thanks Regards, Anshu Shukla