Re: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
Hi, Could someone please revert on this? Thanks Pankaj Bhootra On Sun, 7 Mar 2021, 01:22 Pankaj Bhootra, wrote: > Hello Team > > I am new to Spark and this question may be a possible duplicate of the > issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347 > > We have a large dataset partitioned by calendar date, and within each date > partition, we are storing the data as *parquet* files in 128 parts. > > We are trying to run aggregation on this dataset for 366 dates at a time > with Spark SQL on spark version 2.3.0, hence our Spark job is reading > 366*128=46848 partitions, all of which are parquet files. There is > currently no *_metadata* or *_common_metadata* file(s) available for this > dataset. > > The problem we are facing is that when we try to run *spark.read.parquet* on > the above 46848 partitions, our data reads are extremely slow. It takes a > long time to run even a simple map task (no shuffling) without any > aggregation or group by. > > I read through the above issue and I think I perhaps generally understand > the ideas around *_common_metadata* file. But the above issue was raised > for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation > related to this metadata file so far. > > I would like to clarify: > >1. What's the latest, best practice for reading large number of >parquet files efficiently? >2. Does this involve using any additional options with >spark.read.parquet? How would that work? >3. Are there other possible reasons for slow data reads apart from >reading metadata for every part? We are basically trying to migrate our >existing spark pipeline from using csv files to parquet, but from my >hands-on so far, it seems that parquet's read time is slower than csv? This >seems contradictory to popular opinion that parquet performs better in >terms of both computation and storage? > > > Thanks > Pankaj Bhootra > > > > -- Forwarded message - > From: Takeshi Yamamuro (Jira) > Date: Sat, 6 Mar 2021, 20:02 > Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark > Extremely Slow for Large Number of Files? > To: > > > > [ > https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17296528#comment-17296528 > ] > > Takeshi Yamamuro commented on SPARK-34648: > -- > > Please use the mailing list (user@spark.apache.org) instead. This is not > a right place to ask questions. > > > Reading Parquet Files in Spark Extremely Slow for Large Number of Files? > > > > > > Key: SPARK-34648 > > URL: https://issues.apache.org/jira/browse/SPARK-34648 > > Project: Spark > > Issue Type: Question > > Components: SQL > >Affects Versions: 2.3.0 > >Reporter: Pankaj Bhootra > >Priority: Major > > > > Hello Team > > I am new to Spark and this question may be a possible duplicate of the > issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347 > > We have a large dataset partitioned by calendar date, and within each > date partition, we are storing the data as *parquet* files in 128 parts. > > We are trying to run aggregation on this dataset for 366 dates at a time > with Spark SQL on spark version 2.3.0, hence our Spark job is reading > 366*128=46848 partitions, all of which are parquet files. There is > currently no *_metadata* or *_common_metadata* file(s) available for this > dataset. > > The problem we are facing is that when we try to run > *spark.read.parquet* on the above 46848 partitions, our data reads are > extremely slow. It takes a long time to run even a simple map task (no > shuffling) without any aggregation or group by. > > I read through the above issue and I think I perhaps generally > understand the ideas around *_common_metadata* file. But the above issue > was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any > documentation related to this metadata file so far. > > I would like to clarify: > > # What's the latest, best practice for reading large number of parquet > files efficiently? > > # Does this involve using any additional options with > spark.read.parquet? How would that work? > > # Are there other possible reasons for slow data reads apart from > reading metadata for every part? We are basically trying to migrate our > existing spark pipeline from using csv files to parquet, but from my > hands-on so far, it seems that parquet's read time is slower than csv? This > seems contradictory to popular opinion that parquet performs better in > terms of both computation and storage? > > > > -- > This message was sent by Atlassian Jira > (v8.3.4#803005) >
Fwd: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
Hello Team I am new to Spark and this question may be a possible duplicate of the issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347 We have a large dataset partitioned by calendar date, and within each date partition, we are storing the data as *parquet* files in 128 parts. We are trying to run aggregation on this dataset for 366 dates at a time with Spark SQL on spark version 2.3.0, hence our Spark job is reading 366*128=46848 partitions, all of which are parquet files. There is currently no *_metadata* or *_common_metadata* file(s) available for this dataset. The problem we are facing is that when we try to run *spark.read.parquet* on the above 46848 partitions, our data reads are extremely slow. It takes a long time to run even a simple map task (no shuffling) without any aggregation or group by. I read through the above issue and I think I perhaps generally understand the ideas around *_common_metadata* file. But the above issue was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation related to this metadata file so far. I would like to clarify: 1. What's the latest, best practice for reading large number of parquet files efficiently? 2. Does this involve using any additional options with spark.read.parquet? How would that work? 3. Are there other possible reasons for slow data reads apart from reading metadata for every part? We are basically trying to migrate our existing spark pipeline from using csv files to parquet, but from my hands-on so far, it seems that parquet's read time is slower than csv? This seems contradictory to popular opinion that parquet performs better in terms of both computation and storage? Thanks Pankaj Bhootra -- Forwarded message - From: Takeshi Yamamuro (Jira) Date: Sat, 6 Mar 2021, 20:02 Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files? To: [ https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17296528#comment-17296528 ] Takeshi Yamamuro commented on SPARK-34648: -- Please use the mailing list (user@spark.apache.org) instead. This is not a right place to ask questions. > Reading Parquet Files in Spark Extremely Slow for Large Number of Files? > > > Key: SPARK-34648 > URL: https://issues.apache.org/jira/browse/SPARK-34648 > Project: Spark > Issue Type: Question > Components: SQL >Affects Versions: 2.3.0 >Reporter: Pankaj Bhootra >Priority: Major > > Hello Team > I am new to Spark and this question may be a possible duplicate of the issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347 > We have a large dataset partitioned by calendar date, and within each date partition, we are storing the data as *parquet* files in 128 parts. > We are trying to run aggregation on this dataset for 366 dates at a time with Spark SQL on spark version 2.3.0, hence our Spark job is reading 366*128=46848 partitions, all of which are parquet files. There is currently no *_metadata* or *_common_metadata* file(s) available for this dataset. > The problem we are facing is that when we try to run *spark.read.parquet* on the above 46848 partitions, our data reads are extremely slow. It takes a long time to run even a simple map task (no shuffling) without any aggregation or group by. > I read through the above issue and I think I perhaps generally understand the ideas around *_common_metadata* file. But the above issue was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation related to this metadata file so far. > I would like to clarify: > # What's the latest, best practice for reading large number of parquet files efficiently? > # Does this involve using any additional options with spark.read.parquet? How would that work? > # Are there other possible reasons for slow data reads apart from reading metadata for every part? We are basically trying to migrate our existing spark pipeline from using csv files to parquet, but from my hands-on so far, it seems that parquet's read time is slower than csv? This seems contradictory to popular opinion that parquet performs better in terms of both computation and storage? -- This message was sent by Atlassian Jira (v8.3.4#803005)
Structured Streaming to Kafka Topic
Hi, I am using structured streaming for ETL. val data_stream = spark .readStream // constantly expanding dataframe .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "sms_history") .option("startingOffsets", "earliest") // begin from start of topic .option("failOnDataLoss", "false") .load() I transform this into a DataSet with following schema. root |-- accountId: long (nullable = true) |-- countryId: long (nullable = true) |-- credits: double (nullable = true) |-- deliveryStatus: string (nullable = true) |-- senderId: string (nullable = true) |-- sentStatus: string (nullable = true) |-- source: integer (nullable = true) |-- createdOn: timestamp (nullable = true) |-- send_success_credits: double (nullable = true) |-- send_error_credits: double (nullable = true) |-- delivered_credits: double (nullable = true) |-- invalid_sd_credits: double (nullable = true) |-- undelivered_credits: double (nullable = true) |-- unknown_credits: double (nullable = true) Now I want to write this transformed stream to another Kafka topic. I have temporarily used a UDF that accepts all these columns as parameters and create a json string for adding a column "value" for writing to Kafka. Is there easier and cleaner way to do the same? Thanks, Pankaj
Re: [E] How to do stop streaming before the application got killed
You can add a shutdown hook to your JVM and request spark streaming context to stop gracefully. /** * Shutdown hook to shutdown JVM gracefully * @param ssCtx */ def addShutdownHook(ssCtx: StreamingContext) = { Runtime.getRuntime.addShutdownHook( new Thread() { override def run() = { println("In shutdown hook") // stop gracefully ssCtx.stop(true, true) } }) } } Pankaj On Fri, Dec 22, 2017 at 9:56 AM, Toy wrote: > I'm trying to write a deployment job for Spark application. Basically the > job will send yarn application --kill app_id to the cluster but after the > application received the signal it dies without finishing whatever is > processing or stopping the stream. > > I'm using Spark Streaming. What's the best way to stop Spark application > so we won't lose any data. > > >
Re: [E] Re: Spark Job is stuck at SUBMITTED when set Driver Memory > Executor Memory
Please make sure that you have enough memory available on the driver node. If there is not enough free memory on the driver node, then your application won't start. Pankaj From: vaquar khan mailto:vaquar.k...@gmail.com>> Date: Saturday, June 10, 2017 at 5:02 PM To: Abdulfattah Safa mailto:fattah.s...@gmail.com>> Cc: User mailto:user@spark.apache.org>> Subject: [E] Re: Spark Job is stuck at SUBMITTED when set Driver Memory > Executor Memory You can add memory in your command make sure given memory available on your executor ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077<https://urldefense.proofpoint.com/v2/url?u=http-3A__207.184.161.138-3A7077&d=DwMFaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxpb6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=wxxfRxzLq-84-0MgK0lf3k9fISTBemTByQfiA5jv7zQ&s=vnOyOle4HerCDAASfIwUj29e-H2eVhtSuknGDC9mHyI&e=> \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 https://spark.apache.org/docs/1.1.0/submitting-applications.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_1.1.0_submitting-2Dapplications.html&d=DwMFaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxpb6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=wxxfRxzLq-84-0MgK0lf3k9fISTBemTByQfiA5jv7zQ&s=RPQU9484Nv1qoYOjnB_R_w5pjZga5v3YaA5UMTxEXA0&e=> Also try to avoid function need memory like collect etc. Regards, Vaquar khan On Jun 4, 2017 5:46 AM, "Abdulfattah Safa" mailto:fattah.s...@gmail.com>> wrote: I'm working on Spark with Standalone Cluster mode. I need to increase the Driver Memory as I got OOM in t he driver thread. If found that when setting the Driver Memory to > Executor Memory, the submitted job is stuck at Submitted in the driver and the application never starts.
SPARK-19547
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) I see that there is Spark ticket opened with the same issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been marked as INVALID. Can someone explain why this ticket is marked INVALID. Thanks, Pankaj
Re: how to add colum to dataframe
You may want to try using df2.na.fill(…) From: lk_spark Date: Tuesday, 6 December 2016 at 3:05 PM To: "user.spark" Subject: how to add colum to dataframe hi,all: my spark version is 2.0 I have a parquet file with one colum name url type is string,I wang get substring from the url and add it to the datafram: val df = spark.read.parquet("/parquetdata/weixin/page/month=201607") val df2 = df.withColumn("pa_bid",when($"url".isNull,col("url").substr(3, 5))) df2.select("pa_bid","url").show +--++ |pa_bid| url| +--++ | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| | null|http://mp.weixin| Why what I got is null? 2016-12-06 lk_spark
Execution error during ALS execution in spark
Hi, While building Recommendation engine using spark MLlib (ALS) we are facing some issues during execution. Details are below. We are trying to train our model on 1.4 million sparse rating records (1,00, 000 customer X 50,000 items). The execution DAG cycle is taking a long time and is crashing after several hours when executing model.recommendProductsForUsers() step . The causes of exception are non-uniform and varied from time to time. The common exceptions faced during last 10 runs are a) Akka Timeout b) Out of Memory Exceptions c) Executor disassociation. We have tried increasing execution time to 1200 seconds, that doesn't seem to create an impact sparkConf.set("spark.network.timeout", "1200s"); sparkConf.set("spark.rpc.askTimeout", "1200s"); sparkConf.set("spark.rpc.lookupTimeout", "1200s"); sparkConf.set("spark.akka.timeout", "1200s"); Our command line parameters are as follows --num-executors 5 --executor-memory 2G --conf spark.yarn.executor.memoryOverhead=600 --conf spark.default.parallelism=500 --master yarn Configuration 1. 3 node cluster, 16 GB RAM, Intel I7 processor. 2. Spark 1.5.2 The algorithm is perfectly working for lesser number of records. We would appreciate any help in this regard and would like to know following 1. How can we handle execution of large records in spark without fail, as the rating records will increase with time. 2. Are we missing any command line parameters that are necessary for this type of heavy execution. 3. Does above cluster size and configuration adequate for this many record processing? Large amount of time taken during execution is fine, but the process should not Fail. 4. What is exactly meant by Akka timeout error during ALS job execution ? Regards, Pankaj Rawat
Re: Spark Streaming: java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream
Next thing you may want to check is if the jar has been provided to all the executors in your cluster. Most of the class not found errors got resolved for me after making required jars available in the SparkContext. Thanks. From: Ted Yu mailto:yuzhih...@gmail.com>> Date: Saturday, 12 March 2016 at 7:17 AM To: Siva mailto:sbhavan...@gmail.com>> Cc: spark users mailto:user@spark.apache.org>> Subject: Re: Spark Streaming: java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream KafkaLZ4BlockOutputStream is in kafka-clients jar : $ jar tvf kafka-clients-0.8.2.0.jar | grep KafkaLZ4BlockOutputStream 1609 Wed Jan 28 22:30:36 PST 2015 org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$BD.class 2918 Wed Jan 28 22:30:36 PST 2015 org/apache/kafka/common/message/KafkaLZ4BlockOutputStream$FLG.class 4578 Wed Jan 28 22:30:36 PST 2015 org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.class Can you check whether kafka-clients jar was in the classpath of the container ? Thanks On Fri, Mar 11, 2016 at 5:00 PM, Siva mailto:sbhavan...@gmail.com>> wrote: Hi Everyone, All of sudden we are encountering the below error from one of the spark consumer. It used to work before without any issues. When I restart the consumer with latest offsets, it is working fine for sometime (it executed few batches) and it fails again, this issue is intermittent. Did any one come across this issue? 16/03/11 19:44:50 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 3, ip-172-31-32-183.us-west-2.compute.internal): java.lang.NoClassDefFoundError: org/apache/kafka/common/message/KafkaLZ4BlockOutputStream at kafka.message.ByteBufferMessageSet$.decompress(ByteBufferMessageSet.scala:65) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:179) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:192) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:146) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847) at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:615) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:160) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.message.KafkaLZ4BlockOutputStream at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 23 more Container id: container_1456361466298_0236_01_02 Exit code: 50 Stack trace: ExitCodeException exitCode=50: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 50 16/03/11 19:44:55 INFO yarn.YarnAllocator: Completed container container_145636
seriazable error in apache spark job
I am encountering below error. Can somebody guide ? Something similar is one this link https://github.com/elastic/elasticsearch-hadoop/issues/298 actor.MentionCrawlActor java.io.NotSerializableException: actor.MentionCrawlActor at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_79] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_79] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_79] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_79] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_79] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_79] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_79] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_79] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_79] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/seriazable-error-in-apache-spark-job-tp25732.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question on take function - Spark Java API
Thanks Sonal.. I shall try doing that.. > On 26-Aug-2015, at 1:05 pm, Sonal Goyal wrote: > > You can try using wholeTextFile which will give you a pair rdd of fileName, > content. flatMap through this and manipulate the content. > > Best Regards, > Sonal > Founder, Nube Technologies <http://www.nubetech.co/> > Check out Reifier at Spark Summit 2015 > <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane <mailto:pankaj.wah...@qiotec.com>> wrote: > Hi community members, > > >> Apache Spark is Fantastic and very easy to learn.. Awesome work!!! >> >> Question: >> >> I have multiple files in a folder and and the first line in each file is >> name of the asset that the file belongs to. Second line is csv header row >> and data starts from third row.. >> >> Ex: File 1 >> >> TestAsset01 >> Time,dp_1,dp_2,dp_3 >> 11-01-2015 15:00:00,123,456,789 >> 11-01-2015 15:00:01,123,456,789 >> . . . >> >> Ex: File 2 >> >> TestAsset02 >> Time,dp_1,dp_2,dp_3 >> 11-01-2015 15:00:00,1230,4560,7890 >> 11-01-2015 15:00:01,1230,4560,7890 >> . . . >> >> I have got nearly 1000 files in each folder sizing ~10G >> >> I am using apache spark Java api to read all this files. >> >> Following is code extract that I am using: >> >> try (JavaSparkContext sc = new JavaSparkContext(conf)) { >> Map readingTypeMap = getReadingTypesMap(sc); >> //Read File >> JavaRDD data = >> sc.textFile(resourceBundle.getString(FOLDER_NAME)); >> //Get Asset >> String asset = data.take(1).get(0); >> //Extract Time Series Data >> JavaRDD actualData = data.filter(line -> >> line.contains(DELIMERTER)); >> //Strip header >> String header = actualData.take(1).get(0); >> String[] headers = header.split(DELIMERTER); >> //Extract actual data >> JavaRDD timeSeriesLines = actualData.filter(line -> >> !line.equals(header)); >> //Extract valid records >> JavaRDD validated = timeSeriesLines.filter(line -> >> validate(line)); >> //Find Granularity >> Integer granularity = >> toInt(resourceBundle.getString(GRANULARITY)); >> //Transform to TSD objects >> JavaRDD tsdFlatMap = >> transformTotimeSeries(validated, asset, readingTypeMap, headers, >> granularity); >> >> //Save to Cassandra >> >> javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"), >> "time_series_data", >> mapToRow(TimeSeriesData.class)).saveToCassandra(); >> >> System.out.println("Total Records: " + timeSeriesLines.count()); >> System.out.println("Valid Records: " + validated.count()); >> } >> Within TimeSeriesData Object I need to set the asset name for the reading, >> so I need output of data.take(1) to be different for different files. >> >> >> Thank You. >> >> Best Regards, >> Pankaj >> >> > > > QIO Technologies Limited is a limited company registered in England & Wales > at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 > > This message and the information contained within it is intended solely for > the addressee and may contain confidential or privileged information. If you > have received this message in error please notify QIO Technologies Limited > immediately and then permanently delete this message. If you are not the > intended addressee then you must not copy, transmit, disclose or rely on the > information contained in this message or in any attachment to it, all such > use is prohibited to maximum extent possible by law. > > -- QIO Technologies Limited is a limited company registered in England & Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
Question on take function - Spark Java API
Hi community members, > Apache Spark is Fantastic and very easy to learn.. Awesome work!!! > > Question: > > I have multiple files in a folder and and the first line in each file is name > of the asset that the file belongs to. Second line is csv header row and data > starts from third row.. > > Ex: File 1 > > TestAsset01 > Time,dp_1,dp_2,dp_3 > 11-01-2015 15:00:00,123,456,789 > 11-01-2015 15:00:01,123,456,789 > . . . > > Ex: File 2 > > TestAsset02 > Time,dp_1,dp_2,dp_3 > 11-01-2015 15:00:00,1230,4560,7890 > 11-01-2015 15:00:01,1230,4560,7890 > . . . > > I have got nearly 1000 files in each folder sizing ~10G > > I am using apache spark Java api to read all this files. > > Following is code extract that I am using: > > try (JavaSparkContext sc = new JavaSparkContext(conf)) { > Map readingTypeMap = getReadingTypesMap(sc); > //Read File > JavaRDD data = > sc.textFile(resourceBundle.getString(FOLDER_NAME)); > //Get Asset > String asset = data.take(1).get(0); > //Extract Time Series Data > JavaRDD actualData = data.filter(line -> > line.contains(DELIMERTER)); > //Strip header > String header = actualData.take(1).get(0); > String[] headers = header.split(DELIMERTER); > //Extract actual data > JavaRDD timeSeriesLines = actualData.filter(line -> > !line.equals(header)); > //Extract valid records > JavaRDD validated = timeSeriesLines.filter(line -> > validate(line)); > //Find Granularity > Integer granularity = > toInt(resourceBundle.getString(GRANULARITY)); > //Transform to TSD objects > JavaRDD tsdFlatMap = > transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity); > > //Save to Cassandra > > javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"), > "time_series_data", > mapToRow(TimeSeriesData.class)).saveToCassandra(); > > System.out.println("Total Records: " + timeSeriesLines.count()); > System.out.println("Valid Records: " + validated.count()); > } > Within TimeSeriesData Object I need to set the asset name for the reading, so > I need output of data.take(1) to be different for different files. > > > Thank You. > > Best Regards, > Pankaj > > -- QIO Technologies Limited is a limited company registered in England & Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 This message and the information contained within it is intended solely for the addressee and may contain confidential or privileged information. If you have received this message in error please notify QIO Technologies Limited immediately and then permanently delete this message. If you are not the intended addressee then you must not copy, transmit, disclose or rely on the information contained in this message or in any attachment to it, all such use is prohibited to maximum extent possible by law.
Spark Streaming Restart at scheduled intervals
Hi All, I am creating spark twitter streaming connection in my app over long period of time. When I have some new keywords I need to add them to the spark streaming connection. I need to stop and start the current twitter streaming connection in this case. I have tried akka actor scheduling but could not achieve the same. Have anybody have idea how to do that ? Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Restart-at-scheduled-intervals-tp24192.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Out of memory with twitter spark streaming
Hi I am running one application using activator where I am retrieving tweets and storing them to mysql database using below code. I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the OOM get delayed only. Can anybody give me clue. Here is the code var tweetStream = TwitterUtils.createStream(ssc, None,keywords) var tweets = tweetStream.map(tweet => { var user = tweet.getUser var replyStatusId = tweet.getInReplyToStatusId var reTweetStatus = tweet.getRetweetedStatus var pTweetId = -1L var pcreatedAt = 0L if(reTweetStatus != null){ pTweetId = reTweetStatus.getId pcreatedAt = reTweetStatus.getCreatedAt.getTime } tweet.getCreatedAt.getTime + "|$" + tweet.getId + "|$"+user.getId + "|$" + user.getName+ "|$" + user.getScreenName + "|$" + user.getDescription + "|$" + tweet.getText.trim + "|$" + user.getFollowersCount + "|$" + user.getFriendsCount + "|$" + tweet.getGeoLocation + "|$" + user.getLocation + "|$" + user.getBiggerProfileImageURL + "|$" + replyStatusId + "|$" + pTweetId + "|$" + pcreatedAt } ) tweets.foreachRDD(tweetsRDD => {tweetsRDD.distinct() val count = tweetsRDD.count println("*" +"%s tweets found on this RDD".format(count)) if (count > 0){ var timeMs = System.currentTimeMillis var counter = DBQuery.getProcessedCount() var location="tweets/"+ counter +"/" tweetsRDD.collect().map(tweet=> DBQuery.saveTweets(tweet)) //tweetsRDD.saveAsTextFile(location+ timeMs)+ ".txt" DBQuery.addTweetRDD(counter) } }) // Checkpoint directory to recover from failures println("tweets for the last stream are saved which can be processed later") val= "f:/svn1/checkpoint/" ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AvroFiles
I am not using kyro. I was using the regular sqlcontext.avrofiles to open. The files loads properly with the schema. Exception happens when I try to read it. Will try kyro serializer and see if that helps. On May 5, 2015 9:02 PM, "Todd Nist" wrote: > Are you using Kryo or Java serialization? I found this post useful: > > > http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist > > If using kryo, you need to register the classes with kryo, something like > this: > > > sc.registerKryoClasses(Array( > classOf[ConfigurationProperty], >classOf[Event] > )) > > Or create a registrator something like this: > > class ODSKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register(classOf[ConfigurationProperty], new > AvroSerializer[ConfigurationProperty]()) > kryo.register(classOf[Event], new AvroSerializer[Event]())) > } > > I encountered a similar error since several of the Avor core classes are > not marked Serializable. > > HTH. > > Todd > > On Tue, May 5, 2015 at 7:09 PM, Pankaj Deshpande wrote: > >> Hi I am using Spark 1.3.1 to read an avro file stored on HDFS. The avro >> file was created using Avro 1.7.7. Similar to the example mentioned in >> http://www.infoobjects.com/spark-with-avro/ >> I am getting a nullPointerException on Schema read. It could be a avro >> version mismatch. Has anybody had a similar issue with avro. >> >> >> Thanks >> > >
AvroFiles
Hi I am using Spark 1.3.1 to read an avro file stored on HDFS. The avro file was created using Avro 1.7.7. Similar to the example mentioned in http://www.infoobjects.com/spark-with-avro/ I am getting a nullPointerException on Schema read. It could be a avro version mismatch. Has anybody had a similar issue with avro. Thanks
Issue with deploye Driver in cluster mode
Hi, I have 3 node spark cluster node1 , node2 and node 3 I running below command on node 1 for deploying driver /usr/local/spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.fst.firststep.aggregator.FirstStepMessageProcessor --master spark://ec2-xx-xx-xx-xx.compute-1.amazonaws.com:7077 --deploy-mode cluster --supervise file:///home/xyz/sparkstreaming-0.0.1-SNAPSHOT.jar /home/xyz/config.properties driver gets launched on node 2 in cluster. but getting exception on node 2 that it is trying to bind to node 1 ip. 2015-02-26 08:47:32 DEBUG AkkaUtils:63 - In createActorSystem, requireCookie is: off 2015-02-26 08:47:32 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-26 08:47:33 ERROR NettyTransport:65 - failed to bind to ec2-xx.xx.xx.xx.compute-1.amazonaws.com/xx.xx.xx.xx:0, shutting down Netty transport 2015-02-26 08:47:33 WARN Utils:71 - Service 'Driver' could not bind on port 0. Attempting port 1. 2015-02-26 08:47:33 DEBUG AkkaUtils:63 - In createActorSystem, requireCookie is: off 2015-02-26 08:47:33 ERROR Remoting:65 - Remoting error: [Startup failed] [ akka.remote.RemoteTransportException: Startup failed at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136) at akka.remote.Remoting.start(Remoting.scala:201) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:33) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: ec2-xx-xx-xx.compute-1.amazonaws.com/xx.xx.xx.xx:0 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) at scala.util.Try$.apply(Try.scala:161) at scala.util.Success.map(Try.scala:206) kindly suggest Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-deploye-Driver-in-cluster-mode-tp21821.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Loading JSON dataset with Spark Mllib
Hi, I am new to spark and planning on writing a machine learning application with Spark mllib. My dataset is in json format. Is it possible to load data into spark without using any external json libraries? I have explored the option of SparkSql but I believe that is only for interactive use or loading data into hive tables. Thanks, Pankaj
Re: Spark Team - Paco Nathan said that your team can help
http://spark.apache.org/docs/latest/ Follow this. Its easy to get started. Use prebuilt version of spark as of now :D On Thu, Jan 22, 2015 at 5:06 PM, Sudipta Banerjee < asudipta.baner...@gmail.com> wrote: > > > Hi Apache-Spark team , > > What are the system requirements installing Hadoop and Apache Spark? > I have attached the screen shot of Gparted. > > > Thanks and regards, > Sudipta > > > > > -- > Sudipta Banerjee > Consultant, Business Analytics and Cloud Based Architecture > Call me +919019578099 > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
Re: reading a csv dynamically
Yes I think you need to create one map first which will keep the number of values in every line. Now you can group all the records with same number of values. Now you know how many types of arrays you will have. val dataRDD = sc.textFile("file.csv") val dataLengthRDD = dataRDD .map(line=>(_.split(",").length,line)) val groupedData = dataLengthRDD.groupByKey() now you can process the groupedData as it will have arrays of length x in one RDD. groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. I hope this helps Regards Pankaj Infoshore Software India -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
send me the current code here. I will fix and send back to you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21295.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
I just checked the post. do you need help still ? I think getAs(Seq[String]) should help. If you are still stuck let me know. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21252.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to compute RDD[(String, Set[String])] that include large Set
Instead of counted.saveAsText(“/path/to/save/dir") if you call counted.collect what happens ? If you still face the same issue please paste the stacktrace here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-compute-RDD-String-Set-String-that-include-large-Set-tp21248p21250.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL implementation error
As per telephonic call see how we can fetch the count val tweetsCount = sql("SELECT COUNT(*) FROM tweets") println(f"\n\n\nThere are ${tweetsCount.collect.head.getLong(0)} Tweets on this Dataset\n\n") -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-implementation-error-tp20901p21008.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
Thats great. I was not having access on the developer machine so sent you the psuedo code only. Happy to see its working. If you need any more help related to spark let me know anytime. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Set EXTRA_JAR environment variable for spark-jobserver
I suggest to create uber jar instead. check my thread for the same http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-td20926.html Regards -Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20992.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream
Good luck. Let me know If I can assist you further Regards -Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20991.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
yes row(1).collect would be wrong as it is not tranformation on RDD try getString(1) to fetch the RDD I already said this is the psuedo code. If it does not help let me know I will run the code and send you get/getAs should work for you for example var hashTagsList = popularHashTags.flatMap ( x => x.getAs[Seq[String]](0)) Even if you want I will take the remote of your machine to fix that Regards Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20985.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
If you need more help let me know -Pankaj Linkedin https://www.linkedin.com/profile/view?id=171566646 Skype pankaj.narang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20976.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding most occurrences in a JSON Nested Array
try as below results.map(row => row(1)).collect try var hobbies = results.flatMap(row => row(1)) It will create all the hobbies in a simpe array nowob hbmap =hobbies.map(hobby =>(hobby,1)).reduceByKey((hobcnt1,hobcnt2) =>hobcnt1+hobcnt2) It will aggregate hobbies as below {swimming,2}, {hiking,1} Now hbmap .map{case(hobby,count)=>(count,hobby)}.sortByKey(ascending =false).collect will give you hobbies sorted in descending by their count This is pseudo code and must help you Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p20975.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
If you can paste the code here I can certainly help. Also confirm the version of spark you are using Regards Pankaj Infoshore Software India -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p20953.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NoSuchMethodError: com.typesafe.config.Config.getDuration with akka-http/akka-stream
Like before I get a java.lang.NoClassDefFoundError: akka/stream/FlowMaterializer$ This can be solved using assembly plugin. you need to enable assembly plugin in global plugins C:\Users\infoshore\.sbt\0.13\plugins add a line in plugins.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.0") and then add the following lines in build.sbt import AssemblyKeys._ // put this at the top of the file seq(assemblySettings: _*) Also in the bottom dont forget to add assemblySettings mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard case "log4j.properties" => MergeStrategy.discard case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines case "reference.conf"=> MergeStrategy.concat case _ => MergeStrategy.first } Now in your sbt run sbt assembly that will create the jar which can be run without --jars options as this will be a uber jar containing all jars Also nosuchmethod exception is thrown when there is difference in versions of complied and runtime versions. What is the version of spark you are using ? You need to use same version in build.sbt Here is your build.sbt libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1" //exclude("com.typesafe", "config") libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.1.1" libraryDependencies += "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.3" libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0" withSources() withJavadoc() libraryDependencies += "org.apache.cassandra" % "cassandra-thrift" % "2.0.5" libraryDependencies += "joda-time" % "joda-time" % "2.6" and your error is Exception in thread "main" java.lang.NoSuchMethodError: com.typesafe.config.Config.getDuration(Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J at akka.stream.StreamSubscriptionTimeoutSettings$.apply(FlowMaterializer.scala:256) I think there is version mismatch on the jars you use at runtime If you need more help add me on skype pankaj.narang ---Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-com-typesafe-config-Config-getDuration-with-akka-http-akka-stream-tp20926p20950.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Publishing streaming results to web interface
Thomus, Spark does not provide any web interface directly. There might be third party apps providing dashboards but I am not aware of any for the same purpose. *You can use some methods so that this data is saved on file system instead of being printed on screen Some of the methods you can use ON RDD are saveAsObjectFile, saveAsFile * Now you can read these files to show them on web interface in any language of your choice Regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Publishing-streaming-results-to-web-interface-tp20948p20949.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
(send this email to subscribe)
Re: NoClassDefFoundError when trying to run spark application
do you assemble the uber jar ? you can use sbt assembly to build the jar and then run. It should fix the issue -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoClassDefFoundError-when-trying-to-run-spark-application-tp20707p20944.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading nested JSON data with Spark SQL
oops sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") thois solved the issue important for everyone -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-nested-JSON-data-with-Spark-SQL-tp19310p20936.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading nested JSON data with Spark SQL
Also it looks like that when I store the String in parquet and try to fetch them using spark code I got classcast exception below how my array of strings are saved. each character ascii value is present in array of ints res25: Array[Seq[String]] r= Array(ArrayBuffer(Array(104, 116, 116, 112, 58, 47, 47, 102, 98, 46, 109, 101, 47, 51, 67, 111, 72, 108, 99, 101, 77, 103)), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(), ArrayBuffer(Array(104, 116, 116, 112, 58, 47, 47, 105, 110, 115, 116, 97, 103, 114, 97, 109, 46, 99, 111, 109, 47, 112, 47, 120, 84, 50, 51, 78, 76, 105, 85, 55, 102, 47)), ArrayBuffer(), ArrayBuffer(Array(104, 116, 116, 112, 58, 47, 47, 105, 110, 115, 116, 97, 103, 114, 97, 109, 46, 99, 111, 109, 47, 112, 47, 120, 84, 50, 53, 72, 52, 111, 90, 95, 114, 47)), ArrayBuffer(Array(104, 116, 116, 112, 58, 47, 47, 101, 122, 101, 101, 99, 108, 97, 115, 115, 105, 102, 105, 101, 100, 97, 100, 115, 46, 99, 111, 109, 47, 47, 100, 101, 115, 99, 47, 106, 97, 105, 112, 117, 114, 47, 49, 48, 51, 54, 50, 50, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-nested-JSON-data-with-Spark-SQL-tp19310p20935.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading nested JSON data with Spark SQL
Hih I am having simiiar problem and tries your solution with spark 1.2 build withing hadoop I am saving object to parquet files where some fields are of type Array. When I fetch them as below I get java.lang.ClassCastException: [B cannot be cast to java.lang.CharSequence def fetchTags(rows: SchemaRDD) = { rows.flatMap ( x => ((x.getAs[Buffer[CharSequence]](0)).map(_.toString())) ) } The value I am fetching have been stored as Array of Strings. I have tried replacing Buffer[CharSequence] with Array[String] Seq[String] Seq[Seq[char]] but still got errors Can you provide clue. Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Reading-nested-JSON-data-with-Spark-SQL-tp19310p20933.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Time based aggregation in Real time Spark Streaming
Hi , suppose i keep batch size of 3 minute. in 1 batch there can be incoming records with any time stamp. so it is difficult to keep track of when the 3 minute interval was start and end. i am doing output operation on worker nodes in forEachPartition not in drivers(forEachRdd) so i cannot use any shared variable to store start/end time bcoz shared variable like accumulator are write only in task. is there any solution on this. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102p20111.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Time based aggregation in Real time Spark Streaming
Hi, My incoming message has time stamp as one field and i have to perform aggregation over 3 minute of time slice. Message sample "Item ID" "Item Type" "timeStamp" 1 X 1-12-2014:12:01 1 X 1-12-2014:12:02 1 X 1-12-2014:12:03 1 y 1-12-2014:12:04 1 y 1-12-2014:12:05 1 y 1-12-2014:12:06 Aggregation Result ItemIdItemType count aggregationStartTimeaggrEndTime 1 X 3 1-12-2014:12:01 1-12-2014:12:03 1 y 3 1-12-2014:12:04 1-12-2014:12:06 What is the best way to perform time based aggregation in spark. Kindly suggest. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming job failing after some time.
I have figured out the problem here. Turned out that there was a problem with my SparkConf when I was running my application with yarn in cluster mode. I was setting my master to be local[4] inside my application, whereas I was setting it to yarn-cluster with spark-submit. Now I have changed my SparkConf in my application to not to hardcore master and it works. The application was running for some time since yarn application master attempts retry for maxNumTries and waits between each retry before it completely fails. I was getting appropriate results from my streaming job during this time. Now, I can't figure out as to why it should run successfully during this time even if it could not find SparkContext. I am sure there should be good reason behind this behavior. Anyone has any idea on this? Thanks, Pankaj Channe On Saturday, November 22, 2014, pankaj channe wrote: > Thanks Akhil for your input. > > I have already tried with 3 executors and it still results into the same > problem. So as Sean mentioned, the problem does not seem to be related to > that. > > > On Sat, Nov 22, 2014 at 11:00 AM, Sean Owen wrote: > >> That doesn't seem to be the problem though. It processes but then stops. >> Presumably there are many executors. >> On Nov 22, 2014 9:40 AM, "Akhil Das" wrote: >> >>> For Spark streaming, you must always set *--executor-cores* to a value >>> which is >= 2. Or else it will not do any processing. >>> >>> Thanks >>> Best Regards >>> >>> On Sat, Nov 22, 2014 at 8:39 AM, pankaj channe >>> wrote: >>> >>>> I have seen similar posts on this issue but could not find solution. >>>> Apologies if this has been discussed here before. >>>> >>>> I am running a spark streaming job with yarn on a 5 node cluster. I am >>>> using following command to submit my streaming job. >>>> >>>> spark-submit --class class_name --master yarn-cluster --num-executors 1 >>>> --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar >>>> >>>> >>>> After running for some time, the job stops. The application log shows >>>> following two errors: >>>> >>>> 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve >>>> SparkContext in spite of waiting for 10, maxNumTries = 10 >>>> Exception in thread "main" java.lang.NullPointerException >>>> at >>>> org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218) >>>> at >>>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107) >>>> at >>>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) >>>> at >>>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) >>>> at >>>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409) >>>> at >>>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) >>>> >>>> >>>> and later... >>>> >>>> Failed to list files for dir: >>>> /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20 >>>> at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673) >>>> at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) >>>> at >>>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686) >>>> at >>>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685) >>>> at >>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>>> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) >>>> at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) >>>> at >>>> org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskB
Re: Spark streaming job failing after some time.
Thanks Akhil for your input. I have already tried with 3 executors and it still results into the same problem. So as Sean mentioned, the problem does not seem to be related to that. On Sat, Nov 22, 2014 at 11:00 AM, Sean Owen wrote: > That doesn't seem to be the problem though. It processes but then stops. > Presumably there are many executors. > On Nov 22, 2014 9:40 AM, "Akhil Das" wrote: > >> For Spark streaming, you must always set *--executor-cores* to a value >> which is >= 2. Or else it will not do any processing. >> >> Thanks >> Best Regards >> >> On Sat, Nov 22, 2014 at 8:39 AM, pankaj channe >> wrote: >> >>> I have seen similar posts on this issue but could not find solution. >>> Apologies if this has been discussed here before. >>> >>> I am running a spark streaming job with yarn on a 5 node cluster. I am >>> using following command to submit my streaming job. >>> >>> spark-submit --class class_name --master yarn-cluster --num-executors 1 >>> --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar >>> >>> >>> After running for some time, the job stops. The application log shows >>> following two errors: >>> >>> 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve >>> SparkContext in spite of waiting for 10, maxNumTries = 10 >>> Exception in thread "main" java.lang.NullPointerException >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410) >>> at >>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) >>> at >>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:415) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) >>> at >>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409) >>> at >>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) >>> >>> >>> and later... >>> >>> Failed to list files for dir: >>> /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20 >>> at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673) >>> at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) >>> at >>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686) >>> at >>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685) >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) >>> at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178) >>> at >>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >>> at >>> org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) >>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) >>> at >>> org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169) >>> >>> >>> Note: I am building my jar on my local with spark dependency added in >>> pom.xml and running it on cluster running spark. >>> >>> >>> -Pankaj >>> >> >>
Spark streaming job failing after some time.
I have seen similar posts on this issue but could not find solution. Apologies if this has been discussed here before. I am running a spark streaming job with yarn on a 5 node cluster. I am using following command to submit my streaming job. spark-submit --class class_name --master yarn-cluster --num-executors 1 --driver-memory 1g --executor-memory 1g --executor-cores 1 my_app.jar After running for some time, the job stops. The application log shows following two errors: 14/11/21 22:05:04 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread "main" java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:107) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:410) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:409) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) and later... Failed to list files for dir: /data2/hadoop/yarn/local/usercache/user_name/appcache/application_1416332002106_0009/spark-local-20141121220325-b529/20 at org.apache.spark.util.Utils$.listFilesSafely(Utils.scala:673) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:686) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:685) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:685) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:181) at org.apache.spark.storage.DiskBlockManager$$anonfun$stop$1.apply(DiskBlockManager.scala:178) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:178) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply$mcV$sp(DiskBlockManager.scala:171) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) at org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$1.apply(DiskBlockManager.scala:169) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.storage.DiskBlockManager$$anon$1.run(DiskBlockManager.scala:169) Note: I am building my jar on my local with spark dependency added in pom.xml and running it on cluster running spark. -Pankaj
Re: Use Case of mutable RDD - any ideas around will help.
I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Use Case of mutable RDD - any ideas around will help.
I think i should elaborate usecase little more. So we have UI dashboard whose response time is quite fast as all the data is cached. Users query data based on time range and also there is always new data coming into the system at predefined frequency lets say 1 hour. As you said i can uncache tables it will basically drop all data from memory. I cannot afford losing my cache even for short interval. As all queries from UI will get slow till the time cache loads again. UI response time needs to be predictable and shoudl be fast enough so that user does not get irritated. Also i cannot keep two copies of data(till newrdd materialize) into memory as it will surpass total available memory in system. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14111.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org