Spark streaming over a rest API
Hello, I would like to use spark streaming over a REST api to get information along the time and with diferent parameters in the REST query. I was thinking to use apache kafka but I don´t have any experience with this and I would like to have some advice about this. Thanks. Best regards, Juan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-over-a-rest-API-tp22936.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming and reducing latency
we = Sigmoid back-pressuring mechanism = Stoping the receiver from receiving more messages when its about to exhaust the worker memory. Here's a similar https://issues.apache.org/jira/browse/SPARK-7398 kind of proposal if you haven't seen already. Thanks Best Regards On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote: Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, May 18, 2015 2:03 PM *To:* Evo Eftimov *Cc:* Dmitry Goldenberg; user@spark.apache.org *Subject:* Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: *Could not compute split, block* *input-4-1410542878200 not found* *From:* Evo Eftimov [mailto:evo.efti...@isecc.com] *Sent:* Monday, May 18, 2015 12:13 PM *To:* 'Dmitry Goldenberg'; 'Akhil Das' *Cc:* 'user@spark.apache.org' *Subject:* RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com dgoldenberg...@gmail.com] *Sent:* Monday, May 18, 2015 11:46 AM *To:* Akhil Das *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you
Re: Forbidded : Error Code: 403
Tried almost all the options, but it did not work. So, I ended up creating a new IAM user and the keys of this user are working fine. I am not getting Forbidden(403) exception now, but my program seems to be running infinitely. It's not throwing any exception, but continues to run continuously with following trace : . . . . 15/05/18 17:35:44 INFO HttpServer: Starting HTTP Server 15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/18 17:35:44 INFO AbstractConnector: Started SocketConnector@0.0.0.0:60316 15/05/18 17:35:44 INFO Utils: Successfully started service 'HTTP file server' on port 60316. 15/05/18 17:35:44 INFO SparkEnv: Registering OutputCommitCoordinator 15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT 15/05/18 17:35:44 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/05/18 17:35:44 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/05/18 17:35:44 INFO SparkUI: Started SparkUI at http://172.28.210.74:4040 15/05/18 17:35:44 INFO Executor: Starting executor ID driver on host localhost 15/05/18 17:35:44 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@172.28.210.74:60315/user/HeartbeatReceiver 15/05/18 17:35:44 INFO NettyBlockTransferService: Server created on 60317 15/05/18 17:35:44 INFO BlockManagerMaster: Trying to register BlockManager 15/05/18 17:35:44 INFO BlockManagerMasterActor: Registering block manager localhost:60317 with 66.9 MB RAM, BlockManagerId(driver, localhost, 60317) 15/05/18 17:35:44 INFO BlockManagerMaster: Registered BlockManager 15/05/18 17:35:45 WARN AmazonHttpClient: Detected a possible problem with the current JVM version (1.6.0_65). If you experience XML parsing problems using the SDK, try upgrading to a more recent JVM update. 15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro) 15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro) 15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro) 15/05/18 17:35:48 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading 15/05/18 17:35:48 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro) 15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0 15/05/18 17:35:48 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4 15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0 15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(230868) called with curMem=0, maxMem=70177259 15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 225.5 KB, free 66.7 MB) 15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(31491) called with curMem=230868, maxMem=70177259 15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 66.7 MB) 15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60317 (size: 30.8 KB, free: 66.9 MB) 15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/05/18 17:35:50 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:82 15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro) 15/05/18 17:35:50 INFO FileInputFormat: Total input paths to process : 1 15/05/18 17:35:50 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122 15/05/18 17:35:50 INFO DAGScheduler: Got job 0 (runJob at SparkPlan.scala:122) with 1 output partitions (allowLocal=false) 15/05/18 17:35:50 INFO DAGScheduler: Final stage: Stage 0(runJob at SparkPlan.scala:122) 15/05/18 17:35:50 INFO DAGScheduler: Parents of final stage: List() 15/05/18 17:35:50 INFO DAGScheduler: Missing parents: List() 15/05/18 17:35:50 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents 15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=262359, maxMem=70177259 15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.4 KB, free 66.7 MB) 15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=265807, maxMem=70177259 15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB) 15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB) 15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/05/18 17:35:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/05/18 17:35:50 INFO DAGScheduler: Submitting 1 missing tasks
Re: Processing multiple columns in parallel
My first thought would be creating 10 rdds and run your word count on each of them..I think spark scheduler is going to resolve dependency in parallel and launch 10 jobs. Best Ayan On 18 May 2015 23:41, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, Consider I have a tab delimited text file with 10 columns. Each column is a a set of text. I would like to do a word count for each column. In scala, I would do the following RDD transformation and action: *val data = sc.textFile(hdfs://namenode/data.txt) for(i - 0 until 9){ data.map(_.split(\t,-1)(i)).map((_,1)).reduce(_+_).saveAsTextFile(i) } * Within the for loop, it's a parallel process, but each column is sequentially processed from 0 to 9. Is there anyway so that I can process multiple column in parallel in Spark? I saw posting about using AKKA, but RDD itself is already using AKKA. Any pointers would be appreciated. Regards, Laeeq
pass configuration parameters to PySpark job
Hi , I am looking a way to pass configuration parameters to spark job. In general I have quite simple PySpark job. def process_model(k, vc): do something sc = SparkContext(appName=TAD) lines = sc.textFile(input_job_files) result = lines.map(doSplit).groupByKey().map(lambda (k,vc): process_model(k,vc)) Question: In case I need to pass to process_model function additional metadata , parameters , etc ... I tried to do something like param = 'param1' result = lines.map(doSplit).groupByKey().map(lambda (param,k,vc): process_model(param1,k,vc)) , but job stops to work , also it looks like not elegant solution. Is there a way to have access to SparkContext from my custom functions? I found that there are methods setLocalProperty/getLocalProperty but I didn't find example how to use it for my requirements (from my function). It would be great to have short example how to pass parameters. Thanks Oleg.
parsedData option
Hi Team, My dataset has the following format: CELLPHONE,KL_1,KL_2,KL_3,KL_4,KL_5 1120100114,-5.3244062521117e-003,-4.10825709805041e-003,-1.7816995027779e-002,-4.21462029980323e-003,-1.6200555039e-002 i.e., a reader in the first column and the data separated by comas. To load this data I’m using: val data = sc.textFile(/data/disk1/cluster/fraude5.csv) But now the problem: wath I need to do to choose KL_4 for clustering in the following line? val parsedData = data.map(s = Vectors.dense(s.split(',').map(_.toDouble))).cache() Thanks Ricardo. Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
RE: Spark Streaming and reducing latency
Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:03 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Monday, May 18, 2015 11:46 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence
Working with slides. How do I know how many times a RDD has been processed?
Hi, I have two streaming RDD1 and RDD2 and want to cogroup them. Data don't come in the same time and sometimes they could come with some delay. When I get all data I want to insert in MongoDB. For example, imagine that I get: RDD1 -- T 0 RDD2 --T 0.5 I do cogroup between them but I couldn't store in Mongo yet because it could come more data in the next windows/slide. RDD2' --T 1.5 Another RDD2' comes, I only want to save in Mongo once. So, I should only save it when I get all data. What I know it's how long I should wait as much. Ideally, I would like to save in MongoDB in the last slide for each RDD when I know that there is not possible to get more RDD2 to join with RDD1. Is it possible? how? Maybe there is other way to resolve this problem, any idea?
Re: Spark and Flink
Hi, I would really recommend you to put your Flink and Spark dependencies into different maven modules. Having them both in the same project will be very hard, if not impossible. Both projects depend on similar projects with slightly different versions. I would suggest a maven module structure like this: yourproject-parent (a pom module) -- yourproject-common -- yourproject-flink -- yourproject-spark On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, if i add your dependency i get over 100 errors, now i change the version number: dependencies dependency groupIdcom.fasterxml.jackson.module/groupId artifactIdjackson-module-scala_2.10/artifactId version2.4.4/version exclusions exclusion groupIdcom.google.guava/groupId artifactIdguava/artifactId /exclusion /exclusions /dependency now the pom is fine, but i get the same error by run spark: WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.servlet.DefaultServlet-608411067: java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V java.lang.NoSuchMethodError: org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V at org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41) at org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223) at javax.servlet.GenericServlet.init(GenericServlet.java:244) at org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442) at org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717) at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229) at org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95) at org.eclipse.jetty.server.Server.doStart(Server.java:282) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at mgm.tp.bigdata.tempGeoKmeans.Spark.SparkMain.main(SparkMain.java:37) ... what i do wrong? best regards paul 2015-05-13 15:43 GMT+02:00 Ted Yu yuzhih...@gmail.com: You can use exclusion to remove the undesired jetty version. Here is syntax: dependency groupIdcom.fasterxml.jackson.module/groupId artifactIdjackson-module-scala_2.10/artifactId version${fasterxml.jackson.version}/version exclusions exclusion groupIdcom.google.guava/groupId artifactIdguava/artifactId /exclusion /exclusions /dependency On Wed, May 13, 2015 at 6:41 AM, Paul Röwer paul.roewer1...@googlemail.com wrote: Okay. And how i get it clean in my maven project? Am 13. Mai 2015 15:15:34 MESZ, schrieb Ted Yu yuzhih...@gmail.com: You can run the following command: mvn dependency:tree And see what jetty versions are brought in. Cheers On May 13, 2015, at 6:07 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hi, i use spark and flink in the same maven project, now i get a exception on working with spark, flink work well the problem are transitiv dependencies. maybe somebody know a solution, or versions, which work together. best regards paul ps: a cloudera maven repo flink would be desirable my pom: project
Re: Spark Streaming and reducing latency
We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: *Could not compute split, block* *input-4-1410542878200 not found* *From:* Evo Eftimov [mailto:evo.efti...@isecc.com] *Sent:* Monday, May 18, 2015 12:13 PM *To:* 'Dmitry Goldenberg'; 'Akhil Das' *Cc:* 'user@spark.apache.org' *Subject:* RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com dgoldenberg...@gmail.com] *Sent:* Monday, May 18, 2015 11:46 AM *To:* Akhil Das *Cc:* user@spark.apache.org *Subject:* Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If
RE: Spark Streaming and reducing latency
Ooow – that is essentially the custom feedback loop mentioned in my previous emails in generic Architecture Terms and what you have done is only one of the possible implementations moreover based on Zookeeper – there are other possible designs not using things like zookeeper at all and hence achieving much lower latency and responsiveness Can I also give you a friendly advice – there is a long way FROM “we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible and objective communication and facts From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:28 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency we = Sigmoid back-pressuring mechanism = Stoping the receiver from receiving more messages when its about to exhaust the worker memory. Here's a similar https://issues.apache.org/jira/browse/SPARK-7398 kind of proposal if you haven't seen already. Thanks Best Regards On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote: Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:03 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU)
Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?
Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually need the pyspark code sample which shows how I can call a function from 2 threads and execute it simultaneously. Thanks Regards, Meethu M On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happened to have a look at the spark job server? Someone wrote a python wrapper around it, give it a try. ThanksBest Regards On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a pyspark example? Thanks Regards, Meethu M
Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?
Hi So to be clear, do you want to run one operation in multiple threads within a function or you want run multiple jobs using multiple threads? I am wondering why python thread module can't be used? Or you have already gave it a try? On 18 May 2015 16:39, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually need the pyspark code sample which shows how I can call a function from 2 threads and execute it simultaneously. Thanks Regards, Meethu M On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happened to have a look at the spark job server? https://github.com/ooyala/spark-jobserver Someone wrote a python wrapper https://github.com/wangqiang8511/spark_job_manager around it, give it a try. Thanks Best Regards On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a* pyspark example*? Thanks Regards, Meethu M
How to debug spark in IntelliJ Idea
Hi all, Currently, I wrote some code lines to access spark master which was deployed on standalone style. I wanted to set the breakpoint for spark master which was running on the different process. I am wondering maybe I need attach process in IntelliJ, so that when AppClient sent the message to remote actor(spark master), the breakpoint would be enabled. I don't know how to debug it in IntelliJ Idea. I need help. Thanks. Regards, Yi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-debug-spark-in-IntelliJ-Idea-tp22932.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
py-files (and others?) not properly set up in cluster-mode Spark Yarn job?
I'm having issues with submitting a Spark Yarn job in cluster mode when the cluster filesystem is file:///. It seems that additional resources (--py-files) are simply being skipped and not being added into the PYTHONPATH. The same issue may also exist for --jars, --files, etc. We use a simple NFS mount on all our nodes instead of HDFS. The problem is that when I submit a job that has files (via --py-files), these don't get copied across to the application's staging directory, nor do they get added to the PYTHONPATH. On startup, I can clearly see the message Source and destination file systems are the same. Not copying, which is a result of the check here: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L221 The compareFs function simply looks whether the scheme, host and port are the same, and if so (my case), simply skips the copy. While that in itself isn't a problem, the PYTHONPATH isn't updated either.
Re: number of executors
Oh BTW, it's spark 1.3.1 on hadoop 2.4. AIM 3.6. Sorry for lefting out this information. Appreciate for any help! Ed 2015-05-18 12:53 GMT-04:00 edward cui edwardcu...@gmail.com: I actually have the same problem, but I am not sure whether it is a spark problem or a Yarn problem. I set up a five nodes cluster on aws emr, start yarn daemon on the master (The node manager will not be started on default on the master, I don't want to waste any resource since I have to pay). And submit the spark task through yarn-cluster mode. The command is: ./spark/bin/spark-submit --master yearn-cluster --num-executors 5 --exectutor-cores 4 --propertifies-file spark-application.conf myapp.py But the yarn resource manager only created 4 containers on 4 nodes, and one node was completely on idle. More details about the setup: EMR node: m3.xlarge: 16g ram 4 cores 40g ssd (HDFS on EBS?) Yarn-site.xml: yarn.scheduler.maximum-allocation-mb=11520 yarn.nodemanager.resource.memory-mb=11520 Spark-conf: spark.executor.memory 10g spark.storage.memoryFraction 0.2 spark.python.worker.memory1500mspark.akka.frameSize 200spark.shuffle.memoryFraction0.1 spark.driver.memory 10g Hadoop behavior observed: Create 4 containers on four nodes including emr master but one emr slave on idle (memory consumption around 2g and 0% cpu occupation) Spark use one container for driver on emr slave node (make sense since I required that much of memory) Use the other three node for computing the tasks. If yarn can't use all the nodes and I have to pay for the node, it's just a big waste : p Any thoughts on this? Great thanks, Ed 2015-05-18 12:07 GMT-04:00 Sandy Ryza sandy.r...@cloudera.com: *All On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Re: Reading Real Time Data only from Kafka
I have played a bit with the directStream kafka api. Good work cody. These are my findings and also can you clarify a few things for me (see below). - When auto.offset.reset- smallest and you have 60GB of messages in Kafka, it takes forever as it reads the whole 60GB at once. largest will only read the latest messages. - To avoid this, you can actually limit the rate with spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always reads the same amount of data). - Number of partitions per batch = number of kafka partitions. - In the case of driver failures, offset reset being set to smallest will replay the whole messages and largest will only read those messages which are pushed after the streaming job has started. What happens to those messages which arrive in between? *Few things which are unclear:* - If we have a kafka topic with 9 partitions, and spark cluster with 3 slaves, how does it decides which slave should read from which partition? And what happens if a single slave fails while reading the data? - By default it doesn't push the offsets of messages which are read anywhere, then how does it replay the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other
org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3
Hi, I'm getting this exception after shifting my code from Spark 1.2 to Spark 1.3 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84, cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337), shuffleId=0, mapId=9, reduceId=1, message= org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException: Failed to open file: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index (Permission denied) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191) ... 23 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark groupByKey, does it always create at least 1 partition per key?
I am currently using spark streaming. During my batch processing I must groupByKey. Afterwards I call foreachRDD foreachPartition write to an external datastore. My only concern with this is if it's future proof? I know groupByKey by default uses the hashPartitioner. I have printed out the internals of partitions and loaded large text files into memory and ran groupByKey just to make sure. I have two questions. #1 First will my implementation ever break in the future? Will partitions groupByKey work differently? #2 Is it possible for a (key,values) to exist on more than 1 partition after using groupByKey. Notes: I'm aware groupByKey, is not very efficient. However I am not working with large amounts of data can process batches very quickly. Below I could have used aggregateByKey because I printed the sum, however my real implementation is much different and I do need each value for each key I can not reduce the data. 1 Million line test log file Partition HashCode: 965943941 Key:lol Size:2346 Partition HashCode: 1605678983 Key:ee Size:4692 Partition HashCode: 1605678983 Key:aa Size:32844 Partition HashCode: 1605678983 Key:gg Size:4692 Partition HashCode: 1605678983 Key:dd Size:11730 Partition HashCode: 1605678983 Key:hh Size:4692 Partition HashCode: 1605678983 Key:kk Size:2346 Partition HashCode: 1605678983 Key:tt Size:4692 Partition HashCode: 1605678983 Key:ff Size:2346 Partition HashCode: 1605678983 Key:bb Size:18768 Partition HashCode: 1605678983 Key:cc Size:14076 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-groupByKey-does-it-always-create-at-least-1-partition-per-key-tp22938.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: py-files (and others?) not properly set up in cluster-mode Spark Yarn job?
Hi Shay, Yeah, that seems to be a bug; it doesn't seem to be related to the default FS nor compareFs either - I can reproduce this with HDFS when copying files from the local fs too. In yarn-client mode things seem to work. Could you file a bug to track this? If you don't have a jira account I can do that for you. On Mon, May 18, 2015 at 9:38 AM, Shay Rojansky r...@roji.org wrote: I'm having issues with submitting a Spark Yarn job in cluster mode when the cluster filesystem is file:///. It seems that additional resources (--py-files) are simply being skipped and not being added into the PYTHONPATH. The same issue may also exist for --jars, --files, etc. We use a simple NFS mount on all our nodes instead of HDFS. The problem is that when I submit a job that has files (via --py-files), these don't get copied across to the application's staging directory, nor do they get added to the PYTHONPATH. On startup, I can clearly see the message Source and destination file systems are the same. Not copying, which is a result of the check here: https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L221 The compareFs function simply looks whether the scheme, host and port are the same, and if so (my case), simply skips the copy. While that in itself isn't a problem, the PYTHONPATH isn't updated either. -- Marcelo
RE: Spark Streaming and reducing latency
Thanks for the heads up mate. On 18 May 2015 19:08, Evo Eftimov evo.efti...@isecc.com wrote: Ooow – that is essentially the custom feedback loop mentioned in my previous emails in generic Architecture Terms and what you have done is only one of the possible implementations moreover based on Zookeeper – there are other possible designs not using things like zookeeper at all and hence achieving much lower latency and responsiveness Can I also give you a friendly advice – there is a long way FROM “we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible and objective communication and facts *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, May 18, 2015 2:28 PM *To:* Evo Eftimov *Cc:* Dmitry Goldenberg; user@spark.apache.org *Subject:* Re: Spark Streaming and reducing latency we = Sigmoid back-pressuring mechanism = Stoping the receiver from receiving more messages when its about to exhaust the worker memory. Here's a similar https://issues.apache.org/jira/browse/SPARK-7398 kind of proposal if you haven't seen already. Thanks Best Regards On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote: Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, May 18, 2015 2:03 PM *To:* Evo Eftimov *Cc:* Dmitry Goldenberg; user@spark.apache.org *Subject:* Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: *Could not compute split, block* *input-4-1410542878200 not found* *From:* Evo Eftimov [mailto:evo.efti...@isecc.com] *Sent:* Monday, May 18, 2015 12:13 PM *To:* 'Dmitry Goldenberg'; 'Akhil Das' *Cc:* 'user@spark.apache.org' *Subject:* RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In
RE: Processing multiple columns in parallel
How about making the range in the for loop parallelised? The driver will then kick off the word counts independently. Regards, Guy Needham | Data Discovery Virgin Media | Technology and Transformation | Data Bartley Wood Business Park, Hook, Hampshire RG27 9UP D 01256 75 3362 I welcome VSRE emails. Learn more at http://vsre.info/ From: ayan guha [mailto:guha.a...@gmail.com] Sent: 18 May 2015 15:46 To: Laeeq Ahmed Cc: user@spark.apache.org Subject: Re: Processing multiple columns in parallel My first thought would be creating 10 rdds and run your word count on each of them..I think spark scheduler is going to resolve dependency in parallel and launch 10 jobs. Best Ayan On 18 May 2015 23:41, Laeeq Ahmed laeeqsp...@yahoo.com.invalidmailto:laeeqsp...@yahoo.com.invalid wrote: Hi, Consider I have a tab delimited text file with 10 columns. Each column is a a set of text. I would like to do a word count for each column. In scala, I would do the following RDD transformation and action: val data = sc.textFile(hdfs://namenode/data.txt) for(i - 0 until 9){ data.map(_.split(\t,-1)(i)).map((_,1)).reduce(_+_).saveAsTextFile(i) } Within the for loop, it's a parallel process, but each column is sequentially processed from 0 to 9. Is there anyway so that I can process multiple column in parallel in Spark? I saw posting about using AKKA, but RDD itself is already using AKKA. Any pointers would be appreciated. Regards, Laeeq Save Paper - Do you really need to print this e-mail? Visit www.virginmedia.com for more information, and more fun. This email and any attachments are or may be confidential and legally privileged and are sent solely for the attention of the addressee(s). If you have received this email in error, please delete it from your system: its use, disclosure or copying is unauthorised. Statements and opinions expressed in this email may not represent those of Virgin Media. Any representations or commitments in this email are subject to contract. Registered office: Media House, Bartley Wood Business Park, Hook, Hampshire, RG27 9UP Registered in England and Wales with number 2591237
Re: number of executors
*All On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Re: number of executors
Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Re: number of executors
I actually have the same problem, but I am not sure whether it is a spark problem or a Yarn problem. I set up a five nodes cluster on aws emr, start yarn daemon on the master (The node manager will not be started on default on the master, I don't want to waste any resource since I have to pay). And submit the spark task through yarn-cluster mode. The command is: ./spark/bin/spark-submit --master yearn-cluster --num-executors 5 --exectutor-cores 4 --propertifies-file spark-application.conf myapp.py But the yarn resource manager only created 4 containers on 4 nodes, and one node was completely on idle. More details about the setup: EMR node: m3.xlarge: 16g ram 4 cores 40g ssd (HDFS on EBS?) Yarn-site.xml: yarn.scheduler.maximum-allocation-mb=11520 yarn.nodemanager.resource.memory-mb=11520 Spark-conf: spark.executor.memory 10g spark.storage.memoryFraction0.2 spark.python.worker.memory 1500mspark.akka.frameSize 200spark.shuffle.memoryFraction0.1 spark.driver.memory 10g Hadoop behavior observed: Create 4 containers on four nodes including emr master but one emr slave on idle (memory consumption around 2g and 0% cpu occupation) Spark use one container for driver on emr slave node (make sense since I required that much of memory) Use the other three node for computing the tasks. If yarn can't use all the nodes and I have to pay for the node, it's just a big waste : p Any thoughts on this? Great thanks, Ed 2015-05-18 12:07 GMT-04:00 Sandy Ryza sandy.r...@cloudera.com: *All On Mon, May 18, 2015 at 9:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Re: Spark streaming over a rest API
Why not use sparkstreaming to do the computation and dump the result somewhere in a DB perhaps and take it from there? Thanks Best Regards On Mon, May 18, 2015 at 7:51 PM, juandasgandaras juandasganda...@gmail.com wrote: Hello, I would like to use spark streaming over a REST api to get information along the time and with diferent parameters in the REST query. I was thinking to use apache kafka but I don´t have any experience with this and I would like to have some advice about this. Thanks. Best regards, Juan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-over-a-rest-API-tp22936.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark Streaming and reducing latency
My pleasure young man, i will even go beynd the so called heads up and send you a solution design for Feedback Loop preventing spark streaming app clogging and resource depletion and featuring machine learning based self-tunning AND which is not zookeeper based and hence offers lower latency Ps: ultimately though remember that none of this stuff is part of spark streming as of yet Sent from Samsung Mobile div Original message /divdivFrom: Akhil Das ak...@sigmoidanalytics.com /divdivDate:2015/05/18 16:56 (GMT+00:00) /divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: user@spark.apache.org /divdivSubject: RE: Spark Streaming and reducing latency /divdiv /divThanks for the heads up mate. On 18 May 2015 19:08, Evo Eftimov evo.efti...@isecc.com wrote: Ooow – that is essentially the custom feedback loop mentioned in my previous emails in generic Architecture Terms and what you have done is only one of the possible implementations moreover based on Zookeeper – there are other possible designs not using things like zookeeper at all and hence achieving much lower latency and responsiveness Can I also give you a friendly advice – there is a long way FROM “we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible and objective communication and facts From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:28 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency we = Sigmoid back-pressuring mechanism = Stoping the receiver from receiving more messages when its about to exhaust the worker memory. Here's a similar kind of proposal if you haven't seen already. Thanks Best Regards On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote: Who are “we” and what is the mysterious “back-pressuring mechanism” and is it part of the Spark Distribution (are you talking about implementation of the custom feedback loop mentioned in my previous emails below)- asking these because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says Spark Streaming DOES crash in “unceremonious way” when the free RAM available for In Memory Cashed RDDs gets exhausted From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, May 18, 2015 2:03 PM To: Evo Eftimov Cc: Dmitry Goldenberg; user@spark.apache.org Subject: Re: Spark Streaming and reducing latency We fix the receivers rate at which it should consume at any given point of time. Also we have a back-pressuring mechanism attached to the receivers so it won't simply crashes in the unceremonious way like Evo said. Mesos has some sort of auto-scaling (read it somewhere), may be you can look into that also. Thanks Best Regards On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote: And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will
Re: [SparkSQL 1.4.0] groupBy columns are always nullable?
PR is opened : https://github.com/apache/spark/pull/6237 Le ven. 15 mai 2015 à 17:55, Olivier Girardot ssab...@gmail.com a écrit : yes, please do and send me the link. @rxin I have trouble building master, but the code is done... Le ven. 15 mai 2015 à 01:27, Haopu Wang hw...@qilinsoft.com a écrit : Thank you, should I open a JIRA for this issue? -- *From:* Olivier Girardot [mailto:ssab...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:12 AM *To:* Reynold Xin *Cc:* Haopu Wang; user *Subject:* Re: [SparkSQL 1.4.0] groupBy columns are always nullable? I'll look into it - not sure yet what I can get out of exprs :p Le lun. 11 mai 2015 à 22:35, Reynold Xin r...@databricks.com a écrit : Thanks for catching this. I didn't read carefully enough. It'd make sense to have the udaf result be non-nullable, if the exprs are indeed non-nullable. On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com wrote: Hi Haopu, actually here `key` is nullable because this is your input's schema : scala result.printSchema root |-- key: string (nullable = true) |-- SUM(value): long (nullable = true) scala df.printSchema root |-- key: string (nullable = true) |-- value: long (nullable = false) I tried it with a schema where the key is not flagged as nullable, and the schema is actually respected. What you can argue however is that SUM(value) should also be not nullable since value is not nullable. @rxin do you think it would be reasonable to flag the Sum aggregation function as nullable (or not) depending on the input expression's schema ? Regards, Olivier. Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit : Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error communicating with MapOutputTracker
On Fri, May 15, 2015 at 5:09 PM, Thomas Gerber thomas.ger...@radius.com wrote: Now, we noticed that we get java heap OOM exceptions on the output tracker when we have too many tasks. I wonder: 1. where does the map output tracker live? The driver? The master (when those are not the same)? 2. how can we increase the heap for it? Especially when using spark-submit? It does not live on the master -- that is only in a standalone cluster, and it does very little work. (Though there are *Master and *Worker variants of the class, its really running on the driver and the executors.) If you are getting OOMs in the MapOutputTrackerMaster (which lives on the driver), then you can increase the memory for the driver via the normal args for controlling driver memory, with --driver-memory 10G or whatever. Just to be clear, if you hit an OOM from somewhere in the MapOutputTracker code, it just means that code is what pushed things over the top. Of course you could have 99% of your memory used by something else, perhaps your own data structures, which perhaps could be trimmed down. You could get a heap dump on the driver to see where the memory is really getting used. Do you mind sharing the details of how you hit these OOMs? How much memory, how many partitions on each side of the shuffle? Sort based shuffle I assume? thanks, Imran
Re: applications are still in progress?
Most likely, you never call sc.stop(). Note that in 1.4, this will happen for you automatically in a shutdown hook, taken care of by https://issues.apache.org/jira/browse/SPARK-3090 On Wed, May 13, 2015 at 8:04 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I have some applications finished(but actually failed before), that in WebUI show Application myApp is still in progress. and, in the eventlog folder, there are several log files like this: app-20150512***.inprogress So, I am wondering what the “inprogress” means… Thanks! :) Best, Yifan LI
Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?
SparkContext can be used in multiple threads (Spark streaming works with multiple threads), for example: import threading import time def show(x): time.sleep(1) print x def job(): sc.parallelize(range(100)).foreach(show) threading.Thread(target=job).start() On Mon, May 18, 2015 at 12:34 AM, ayan guha guha.a...@gmail.com wrote: Hi So to be clear, do you want to run one operation in multiple threads within a function or you want run multiple jobs using multiple threads? I am wondering why python thread module can't be used? Or you have already gave it a try? On 18 May 2015 16:39, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually need the pyspark code sample which shows how I can call a function from 2 threads and execute it simultaneously. Thanks Regards, Meethu M On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happened to have a look at the spark job server? Someone wrote a python wrapper around it, give it a try. Thanks Best Regards On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a pyspark example? Thanks Regards, Meethu M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: parallelism on binary file
You can use sc.hadoopFile (or any of the variants) to do what you want. They even let you reuse your existing HadoopInputFormats. You should be able to mimic your old use with MR just fine. sc.textFile is just a convenience method which sits on top. imran On Fri, May 8, 2015 at 12:03 PM, tog guillaume.all...@gmail.com wrote: Hi I havé an application that currently run using MR. It currently starts extracting information from a proprietary binary file that is copied to HDFS. The application starts by creating business objects from information extracted from the binary files. Later those objects are used for further processing using again MR jobs. I am planning to move towards Spark and I clearly see that I could use JavaRDDbusinessObjects for parallel processing. however it is not yet obvious what could be the process to generate this RDD from my binary file in parallel. Today I use parallelism based on the split assign to each of the map elements of a job. Can I mimick such a thing using spark. All example I have seen so far are using text files for which I guess the partitions are based on a given number of contiguous lines. Any help or pointer would be appreciated Cheers Guillaume -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net
Re: Implementing custom metrics under MLPipeline's BinaryClassificationEvaluator
Hi Justin, It sound like you're on the right track. The best way to write a custom Evaluator will probably be to modify an existing Evaluator as you described. It's best if you don't remove the other code, which handles parameter set/get and schema validation. Joseph On Sun, May 17, 2015 at 10:35 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I would like to use other metrics in BinaryClassificaitonEvaluator, I am thinking about simple ones (i.e. PrecisionByThreshold). From the api site, I can't tell much about how to implement it. From the code, it seems like I will have to override this function, using most of the existing code for checking column schema, then replace the line which compute the actual score https://github.com/apache/spark/blob/1b8625f4258d6d1a049d0ba60e39e9757f5a568b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala#L72 . Is my understanding correct? Or there are more convenient way of implementing a metric in order to be used by ML pipeline? Thanks. Justin -- View this message in context: Implementing custom metrics under MLPipeline's BinaryClassificationEvaluator http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-custom-metrics-under-MLPipeline-s-BinaryClassificationEvaluator-tp22930.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
TwitterUtils on Windows
I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) stream.print ssc.start This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath, a+x) but Im not sure why... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: number of executors
Yeah, I read that page before, but it does not mention the options should come before the application jar. Actually, if I put the --class option before the application jar, I will get ClassNotFoundException. Anyway, thanks again Sandy. On Tue, May 19, 2015 at 11:06 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Awesome! It's documented here: https://spark.apache.org/docs/latest/submitting-applications.html -Sandy On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi Sandy, Thanks for your information. Yes, spark-submit --master yarn --num-executors 5 --executor-cores 4 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is working awesomely. Is there any documentations pointing to this ? Thanks, Xiaohe On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Re: TwitterUtils on Windows
I think I found the answer - http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html Do I have no way of running this in Windows locally? On Mon, May 18, 2015 at 10:44 PM, Justin Pihony justin.pih...@gmail.com wrote: I'm not 100% sure that is causing a problem, though. The stream still starts, but is giving blank output. I checked the environment variables in the ui and it is running local[*], so there should be no bottleneck there. On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com wrote: I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) stream.print ssc.start This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath, a+x) but Im not sure why... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi, Thanks for the response. But I could not see fillna function in DataFrame class. [cid:image001.png@01D0920E.32B14460] Is it available in some specific version of Spark sql. This is what I have in my pom.xml dependency groupIdorg.apache.spark/groupId artifactIdspark-sql_2.10/artifactId version1.3.1/version /dependency Regards, Anand.C From: ayan guha [mailto:guha.a...@gmail.com] Sent: Monday, May 18, 2015 5:19 PM To: Chandra Mohan, Ananda Vel Murugan; user Subject: Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row Hi Give a try with dtaFrame.fillna function to fill up missing column Best Ayan On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote: Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = new ArrayListStructField(); MetadataBuilder mb = new MetadataBuilder(); mb.putBoolean(nullable, true); Metadata m = mb.build(); for (String fieldName: schemaString.split( )) { fields.add(new StructField(fieldName,DataTypes.DoubleType,true, m)); } StructType schema = DataTypes.createStructType(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet”) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C -- Best Regards, Ayan Guha
Re: TwitterUtils on Windows
I'm not 100% sure that is causing a problem, though. The stream still starts, but is giving blank output. I checked the environment variables in the ui and it is running local[*], so there should be no bottleneck there. On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com wrote: I am trying to print a basic twitter stream and receiving the following error: 15/05/18 22:03:14 INFO Executor: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with timestamp 1432000973058 15/05/18 22:03:14 INFO Utils: Fetching http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to C:\Users\Justin\AppData\Local\Temp\spark-4a37d3 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja va:715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:744) Code is: spark-shell --jars \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming._ System.setProperty(twitter4j.oauth.consumerKey,*) System.setProperty(twitter4j.oauth.consumerSecret,*) System.setProperty(twitter4j.oauth.accessToken,*) System.setProperty(twitter4j.oauth.accessTokenSecret,*) val ssc = new StreamingContext(sc, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None) stream.print ssc.start This seems to be happening at FileUtil.chmod(targetFile.getAbsolutePath, a+x) but Im not sure why... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TwitterUtils-on-Windows-tp22939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: number of executors
Hi Sandy, Thanks for your information. Yes, spark-submit --master yarn --num-executors 5 --executor-cores 4 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is working awesomely. Is there any documentations pointing to this ? Thanks, Xiaohe On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Re: number of executors
Awesome! It's documented here: https://spark.apache.org/docs/latest/submitting-applications.html -Sandy On Mon, May 18, 2015 at 8:03 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi Sandy, Thanks for your information. Yes, spark-submit --master yarn --num-executors 5 --executor-cores 4 target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp is working awesomely. Is there any documentations pointing to this ? Thanks, Xiaohe On Tue, May 19, 2015 at 12:07 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Xiaohe, The all Spark options must go before the jar or they won't take effect. -Sandy On Sun, May 17, 2015 at 8:59 AM, xiaohe lan zombiexco...@gmail.com wrote: Sorry, them both are assigned task actually. Aggregated Metrics by Executor Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size / RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4 MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8 MB On Sun, May 17, 2015 at 11:50 PM, xiaohe lan zombiexco...@gmail.com wrote: bash-4.1$ ps aux | grep SparkSubmit xilan 1704 13.2 1.2 5275520 380244 pts/0 Sl+ 08:39 0:13 /scratch/xilan/jdk1.8.0_45/bin/java -cp /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 --executor-cores 4 xilan 1949 0.0 0.0 103292 800 pts/1S+ 08:40 0:00 grep --color SparkSubmit When look at the sparkui, I see the following: Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB / 28089782host2:49970 ms00063.4 MB / 1810945 So executor 2 is not even assigned a task ? Maybe I have some problems in my setting, but I don't know what could be the possible settings I set wrong or have not set. Thanks, Xiaohe On Sun, May 17, 2015 at 11:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try --executor-cores param? While you submit the job, do a ps aux | grep spark-submit and see the exact command parameters. Thanks Best Regards On Sat, May 16, 2015 at 12:31 PM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app. spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp --num-executors 5 I have set the number of executor to 5, but from sparkui I could see only two executors and it ran very slow. What did I miss ? Thanks, Xiaohe
Spark Streaming graceful shutdown in Spark 1.4
Hi, Just figured out that if I want to perform graceful shutdown of Spark Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for Spark Core, that gets anyway called , which leads to graceful shutdown from Spark streaming failed with error like Sparkcontext already closed issue. To solve this , I need to explicitly add Utils.addShutdownHook in my driver with higher priority ( say 150 ) than Spark's shutdown priority of 50 , and there I specified streamingcontext stop method with (false , true) parameter. Just curious to know , if this is how we need to handle shutdown hook going forward ? Can't we make the streaming shutdown default to gracefully shutdown ? Also the Java Api for adding shutdownhook in Utils looks very dirty with methods like this .. Utils.addShutdownHook(150, new Function0BoxedUnit() { @Override public BoxedUnit apply() { return null; } @Override public byte apply$mcB$sp() { return 0; } @Override public char apply$mcC$sp() { return 0; } @Override public double apply$mcD$sp() { return 0; } @Override public float apply$mcF$sp() { return 0; } @Override public int apply$mcI$sp() { // TODO Auto-generated method stub return 0; } @Override public long apply$mcJ$sp() { return 0; } @Override public short apply$mcS$sp() { return 0; } @Override public void apply$mcV$sp() { *jsc.stop(false, true);* } @Override public boolean apply$mcZ$sp() { // TODO Auto-generated method stub return false; } });
Re: spark log field clarification
depends what you mean by output data. Do you mean: * the data that is sent back to the driver? that is result size * the shuffle output? that is in Shuffle Write Metrics * the data written to a hadoop output format? that is in Output Metrics On Thu, May 14, 2015 at 2:22 PM, yanwei echo@gmail.com wrote: I am trying to extract the *output data size* information for *each task*. What *field(s)* should I look for, given the json-format log? Also, what does Result Size stand for? Thanks a lot in advance! -Yanwei -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-log-field-clarification-tp22892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LogisticRegressionWithLBFGS with large feature set
I'm not super familiar with this part of the code, but from taking a quick look: a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles per feature (mean, max, min, etc. etc.) b) The limit is on the result size from *all* tasks, not from one task. You start with 3072 tasks c) tree aggregate should first merge things down to about 8 partitions before bringing results back to the driver, which is how you end up with 54 tasks at your failure. this means you should have about 30 MB / per task per meaure * 54 tasks * 7 measures, which comes to about 11GB, or in the ballpark of what you found. In principle, you could get this working by adding more levels to the treeAggregate (the depth parameter), but looks like that isn't exposed. You could also try coalescing your data down to a smaller set of partitions first, but that comes with other downsides. Perhaps an MLLib expert could chime in on an alternate approach. My feeling (from a very quick look) is that there is room for some optimization in the internals Imran On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: *Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)* The associated stage in the job is treeAggregate at StandardScaler.scala:52 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala
Re: pass configuration parameters to PySpark job
In PySpark, it serializes the functions/closures together with used global values. For example, global_param = 111 def my_map(x): return x + global_param rdd.map(my_map) - Davies On Mon, May 18, 2015 at 7:26 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am looking a way to pass configuration parameters to spark job. In general I have quite simple PySpark job. def process_model(k, vc): do something sc = SparkContext(appName=TAD) lines = sc.textFile(input_job_files) result = lines.map(doSplit).groupByKey().map(lambda (k,vc): process_model(k,vc)) Question: In case I need to pass to process_model function additional metadata , parameters , etc ... I tried to do something like param = 'param1' result = lines.map(doSplit).groupByKey().map(lambda (param,k,vc): process_model(param1,k,vc)) , but job stops to work , also it looks like not elegant solution. Is there a way to have access to SparkContext from my custom functions? I found that there are methods setLocalProperty/getLocalProperty but I didn't find example how to use it for my requirements (from my function). It would be great to have short example how to pass parameters. Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted
Looks like this exception is after many more failures have occurred. It is already on attempt 6 for stage 7 -- I'd try to find out why attempt 0 failed. This particular exception is probably a result of corruption that can happen when stages are retried, that I'm working on addressing in https://issues.apache.org/jira/browse/SPARK-7308. But your real problem is figuring out why the stage failed in the first place. On Wed, May 13, 2015 at 6:01 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I was running our graphx application(worked finely on Spark 1.2.0) but failed on Spark 1.3.1 with below exception. Anyone has idea on this issue? I guess it was caused by using LZ4 codec? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 7.6 failed 128 times, most recent failure: Lost task 54.127 in stage 7.6 (TID 5311, small15-tap1.common.lip6.fr): com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted at com.esotericsoftware.kryo.io.Input.fill(Input.java:142) at com.esotericsoftware.kryo.io.Input.require(Input.java:155) at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116) at com.esotericsoftware.kryo.io.Input.fill(Input.java:140) ... 35 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at
Re: Spark on Yarn : Map outputs lifetime ?
Neither of those two. Instead, the shuffle data is cleaned up when the stage they are from get GC'ed by the jvm. that is, when you are no longer holding any references to anything which points to the old stages, and there is an appropriate gc event. The data is not cleaned up right after the stage completes, because it might get used again by another later (eg., if the stage is retried). On Tue, May 12, 2015 at 6:50 PM, Ashwin Shankar ashwinshanka...@gmail.com wrote: Hi, In spark on yarn and when running spark_shuffle as auxiliary service on node manager, does map spills of a stage gets cleaned up once the next stage completes OR is it preserved till the app completes(ie waits for all the stages to complete) ? -- Thanks, Ashwin
Re: Restricting the number of iterations in Mllib Kmeans
Hi Suman, For maxIterations, are you using the DenseKMeans.scala example code? (I'm guessing yes since you mention the command line.) If so, then you should be able to specify maxIterations via an extra parameter like --numIterations 50 (note the example uses numIterations in the current master instead of maxIterations, which is sort of a bug in the example). If that does not cap the max iterations, then please report it as a bug. To specify the initial centroids, you will need to modify the DenseKMeans example code. Please see the KMeans API docs for details. Good luck, Joseph On Mon, May 18, 2015 at 3:22 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I think you cant supply an initial set of centroids to kmeans Thanks Regards, Meethu M On Friday, 15 May 2015 12:37 AM, Suman Somasundar suman.somasun...@oracle.com wrote: Hi,, I want to run a definite number of iterations in Kmeans. There is a command line argument to set maxIterations, but even if I set it to a number, Kmeans runs until the centroids converge. Is there a specific way to specify it in command line? Also, I wanted to know if we can supply the initial set of centroids to the program instead of it choosing the centroids in random? Thanks, Suman.
Re: Broadcast variables can be rebroadcast?
Rather than updating the broadcast variable, can't you simply create a new one? When the old one can be gc'ed in your program, it will also get gc'ed from spark's cache (and all executors). I think this will make your code *slightly* more complicated, as you need to add in another layer of indirection for which broadcast variable to use, but not too bad. Eg., from var myBroadcast = sc.broadcast( ...) (0 to 20).foreach{ iteration = // ... some rdd operations that involve myBroadcast ... myBroadcast.update(...) // wrong! dont' update a broadcast variable } instead do something like: def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = { ... } var myBroadcast = sc.broadcast(...) (0 to 20).foreach { iteration = oneIteration(myRDD, myBroadcast) var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with whatever you need to update it } On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote: Thanks Ayan. Can we rebroadcast after updating in the driver? Thanks NB. On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote: Hi broadcast variables are shipped for the first time it is accessed in a transformation to the executors used by the transformation. It will NOT updated subsequently, even if the value has changed. However, a new value will be shipped to any new executor comes into play after the value has changed. This way, changing value of broadcast variable is not a good idea as it can create inconsistency within cluster. From documentatins: In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote: Thanks Ilya. Does one have to call broadcast again once the underlying data is updated in order to get the changes visible on all nodes? Thanks NB On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: The broadcast variable is like a pointer. If the underlying data changes then the changes will be visible throughout the cluster. On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote: Hello, Once a broadcast variable is created using sparkContext.broadcast(), can it ever be updated again? The use case is for something like the underlying lookup data changing over time. Thanks NB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Partition number of Spark Streaming Kafka receiver-based approach
Hi all, I am reading the docs of receiver-based Kafka consumer. The last parameters of KafkaUtils.createStream is per topic number of Kafka partitions to consume. My question is, does the number of partitions for topic in this parameter need to match the number of partitions in Kafka. For example, I have two topics, topic1 with 3 partitions and topic2 with 4 partitions. If i specify 2 for topic1 and 3 for topic2 and feed them to the createStream function, will there be data loss? Or it will just be an inefficiency. Thanks! Bill
Re: Partition number of Spark Streaming Kafka receiver-based approach
HI Bill, You don't need to match the number of thread to the number of partitions in the specific topic, for example, you have 3 partitions in topic1, but you only set 2 threads, ideally 1 thread will receive 2 partitions and another thread for the left one partition, it depends on the scheduling of Kafka itself, basically the data will not be lost. But you don't need to set the thread number which is larger than the partition number, since each partition can only be consumed by one consumer, so the left threads will be wasted. 2015-05-19 7:46 GMT+08:00 Bill Jay bill.jaypeter...@gmail.com: Hi all, I am reading the docs of receiver-based Kafka consumer. The last parameters of KafkaUtils.createStream is per topic number of Kafka partitions to consume. My question is, does the number of partitions for topic in this parameter need to match the number of partitions in Kafka. For example, I have two topics, topic1 with 3 partitions and topic2 with 4 partitions. If i specify 2 for topic1 and 3 for topic2 and feed them to the createStream function, will there be data loss? Or it will just be an inefficiency. Thanks! Bill
Re: FetchFailedException and MetadataFetchFailedException
Hi, can you take a look at the logs and see what the first error you are getting is? Its possible that the file doesn't exist when that error is produced, but it shows up later -- I've seen similar things happen, but only after there have already been some errors. But, if you see that in the very first error, then Im not sure what the cause is. Would be helpful for you to send the logs. Imran On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote: I am trying to sort a collection of key,value pairs (between several hundred million to a few billion) and have recently been getting lots of FetchFailedException errors that seem to originate when one of the executors doesn't seem to find a temporary shuffle file on disk. E.g.: org.apache.spark.shuffle.FetchFailedException: /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index (No such file or directory) This file actually exists: ls -l /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index This error repeats on several executors and is followed by a number of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 This results on most tasks being lost and executors dying. There is plenty of space on all of the appropriate filesystems, so none of the executors are running out of disk space. Any idea what might be causing this? I am running this via YARN on approximately 100 nodes with 2 cores per node. Any thoughts on what might be causing these errors? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib SVMWithSGD is failing for large dataset
Reducing the number of instances won't help in this case. We use the driver to collect partial gradients. Even with tree aggregation, it still puts heavy workload on the driver with 20M features. Please try to reduce the number of partitions before training. We are working on a more scalable implementation of logistic regression now, which should be able to solve this problem efficiently. -Xiangrui On Tue, Apr 28, 2015 at 3:43 PM, sarathkrishn...@gmail.com sarathkrishn...@gmail.com wrote: Hi, I'm just calling the standard SVMWithSGD implementation of Spark's MLLib. I'm not using any method like collect. Thanks, Sarath On Tue, Apr 28, 2015 at 4:35 PM, ai he heai0...@gmail.com wrote: Hi Sarath, It might be questionable to set num-executors as 64 if you only has 8 nodes. Do you use any action like collect which will overwhelm the driver since you have a large dataset? Thanks On Tue, Apr 28, 2015 at 10:50 AM, sarath sarathkrishn...@gmail.com wrote: I am trying to train a large dataset consisting of 8 million data points and 20 million features using SVMWithSGD. But it is failing after running for some time. I tried increasing num-partitions, driver-memory, executor-memory, driver-max-resultSize. Also I tried by reducing the size of dataset from 8 million to 25K (keeping number of features same 20 M). But after using the entire 64GB driver memory for 20 to 30 min it failed. I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM). executor-memory - 60G driver-memory - 60G num-executors - 64 And other default settings This is the error log : 15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 is closed 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ... 15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks java.io.IOException: Failed to connect to xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
Re: StandardScaler failing with OOM errors in PySpark
AFAIK, there are two places where you can specify the driver memory. One is via spark-summit --driver-memory and the other is via spark.driver.memory in spark-defaults.conf. Please try these approaches and see whether they work or not. You can find detailed instructions at http://spark.apache.org/docs/latest/configuration.html and http://spark.apache.org/docs/latest/submitting-applications.html. -Xiangrui On Tue, Apr 28, 2015 at 4:06 AM, Rok Roskar rokros...@gmail.com wrote: That's exactly what I'm saying -- I specify the memory options using spark options, but this is not reflected in how the JVM is created. No matter which memory settings I specify, the JVM for the driver is always made with 512Mb of memory. So I'm not sure if this is a feature or a bug? rok On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng men...@gmail.com wrote: You might need to specify driver memory in spark-submit instead of passing JVM options. spark-submit is designed to handle different deployments correctly. -Xiangrui On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar rokros...@gmail.com wrote: ok yes, I think I have narrowed it down to being a problem with driver memory settings. It looks like the application master/driver is not being launched with the settings specified: For the driver process on the main node I see -XX:MaxPermSize=128m -Xms512m -Xmx512m as options used to start the JVM, even though I specified 'spark.yarn.am.memory', '5g' 'spark.yarn.am.memoryOverhead', '2000' The info shows that these options were read: 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120 MB memory including 2000 MB overhead Is there some reason why these options are being ignored and instead starting the driver with just 512Mb of heap? On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote: the feature dimension is 800k. yes, I believe the driver memory is likely the problem since it doesn't crash until the very last part of the tree aggregation. I'm running it via pyspark through YARN -- I have to run in client mode so I can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory and overhead parameters but it doesn't seem to have an effect. Thanks, Rok On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote: What is the feature dimension? Did you set the driver memory? -Xiangrui On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote: I'm trying to use the StandardScaler in pyspark on a relatively small (a few hundred Mb) dataset of sparse vectors with 800k features. The fit method of StandardScaler crashes with Java heap space or Direct buffer memory errors. There should be plenty of memory around -- 10 executors with 2 cores each and 8 Gb per core. I'm giving the executors 9g of memory and have also tried lots of overhead (3g), thinking it might be the array creation in the aggregators that's causing issues. The bizarre thing is that this isn't always reproducible -- sometimes it actually works without problems. Should I be setting up executors differently? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: bug: numClasses is not a valid argument of LogisticRegressionWithSGD
LogisticRegressionWithSGD doesn't support multi-class. Please use LogisticRegressionWithLBFGS instead. -Xiangrui On Mon, Apr 27, 2015 at 12:37 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: With the Python APIs, the available arguments I got (using inspect module) are the following: ['cls', 'data', 'iterations', 'step', 'miniBatchFraction', 'initialWeights', 'regParam', 'regType', 'intercept'] numClasses is not available. Can someone comment on this? Thanks, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = new ArrayListStructField(); MetadataBuilder mb = new MetadataBuilder(); mb.putBoolean(nullable, true); Metadata m = mb.build(); for (String fieldName: schemaString.split( )) { fields.add(new StructField(fieldName,DataTypes.DoubleType,true, m)); } StructType schema = DataTypes.createStructType(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C
Re: Spark Streaming and reducing latency
Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark's Guava pieces cause exceptions in non-trivial deployments
On 16 May 2015, at 04:39, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: For me it wouldn’t help I guess, because those newer classes would still be loaded by different classloader. What did work for me with 1.3.1 – removing of those classes from Spark’s jar completely, so they get loaded from external Guava (the version I prefer) and by the classloader I expect. Note that Hadoop = 2.6.0 wont' work with Guava = 0.17; see: HADOOP-11032 FWIW Guava is a version nightmare across the hadoop stack; almost as bad as protobuf.jar. With Hadoop 2.7+, Hadoop will run on later versions, it'll just continue to ship an older one to avoid breaking apps that expect it.
NullPointerException when accessing broadcast variable in DStream
Hi I'm trying to use broadcast variables in my Spark streaming program. val conf = new SparkConf().setMaster(SPARK_MASTER).setAppName(APPLICATION_NAME) val ssc = new StreamingContext(conf, Seconds(1)) val LIMIT = ssc.sparkContext.broadcast(5L) println(LIMIT.value) // this print 5 val lines = ssc.socketTextStream(localhost, ) val words = lines.flatMap(_.split( )) filter (_.size LIMIT.value) words.print() ssc.start() ssc.awaitTermination() It throws java.lang.NullPointerException at the line (_.size LIMIT.value) so I'm guessing LIMIT is not accessible within the stream I'm running spark 1.3.1 in standalone mode with 2 nodes cluster. I tried with spark-shell and it works fine. Please help! Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-accessing-broadcast-variable-in-DStream-tp22934.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Restricting the number of iterations in Mllib Kmeans
Hi,I think you cant supply an initial set of centroids to kmeans Thanks Regards, Meethu M On Friday, 15 May 2015 12:37 AM, Suman Somasundar suman.somasun...@oracle.com wrote: !--#yiv5602900621 _filtered #yiv5602900621 {font-family:Cambria Math;panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv5602900621 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;}#yiv5602900621 #yiv5602900621 p.yiv5602900621MsoNormal, #yiv5602900621 li.yiv5602900621MsoNormal, #yiv5602900621 div.yiv5602900621MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:11.0pt;font-family:Calibri, sans-serif;}#yiv5602900621 a:link, #yiv5602900621 span.yiv5602900621MsoHyperlink {color:blue;text-decoration:underline;}#yiv5602900621 a:visited, #yiv5602900621 span.yiv5602900621MsoHyperlinkFollowed {color:purple;text-decoration:underline;}#yiv5602900621 span.yiv5602900621EmailStyle17 {font-family:Calibri, sans-serif;color:windowtext;}#yiv5602900621 .yiv5602900621MsoChpDefault {} _filtered #yiv5602900621 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv5602900621 div.yiv5602900621WordSection1 {}--Hi,, I want to run a definite number of iterations in Kmeans. There is a command line argument to set maxIterations, but even if I set it to a number, Kmeans runs until the centroids converge. Is there a specific way to specify it in command line? Also, I wanted to know if we can supply the initial set of centroids to the program instead of it choosing the centroids in random? Thanks, Suman.
RE: Spark Streaming and reducing latency
You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Monday, May 18, 2015 11:46 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with this at the streaming API level? If latency is of great concern, is it better to look into streaming from something like Flume where data is pushed to consumers rather than pulled by them? Are there techniques, in that case, to ensure the consumers don't get overwhelmed with new data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row
Hi Give a try with dtaFrame.fillna function to fill up missing column Best Ayan On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan ananda.muru...@honeywell.com wrote: Hi, I am using spark-sql to read a CSV file and write it as parquet file. I am building the schema using the following code. String schemaString = a b c; ListStructField fields = *new* ArrayListStructField(); MetadataBuilder mb = *new* MetadataBuilder(); mb.putBoolean(nullable, *true*); Metadata m = mb.build(); *for* (String fieldName: schemaString.split( )) { fields.add(*new* StructField(fieldName,DataTypes. *DoubleType*,*true*, m)); } StructType schema = DataTypes.*createStructType*(fields); Some of the rows in my input csv does not contain three columns. After building my JavaRDDRow, I create data frame as shown below using the RDD and schema. DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema); Finally I try to save it as Parquet file darDataFrame.saveAsParquetFile(/home/anand/output.parquet”) I get this error when saving it as Parquet file java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (3 2) I understand the reason behind this error. Some of my rows in Row RDD does not contain three elements as some rows in my input csv does not contain three columns. But while building the schema, I am specifying every field as nullable. So I believe, it should not throw this error. Can anyone help me fix this error. Thank you. Regards, Anand.C -- Best Regards, Ayan Guha
RE: Spark Streaming and reducing latency
And if you want to genuinely “reduce the latency” (still within the boundaries of the micro-batch) THEN you need to design and finely tune the Parallel Programming / Execution Model of your application. The objective/metric here is: a) Consume all data within your selected micro-batch window WITHOUT any artificial message rate limits b) The above will result in a certain size of Dstream RDD per micro-batch. c) The objective now is to Process that RDD WITHIN the time of the micro-batch (and also account for temporary message rate spike etc which may further increase the size of the RDD) – this will avoid any clogging up of the app and will process your messages at the lowest latency possible in a micro-batch architecture d) You achieve the objective stated in c by designing, varying and experimenting with various aspects of the Spark Streaming Parallel Programming and Execution Model – e.g. number of receivers, number of threads per receiver, number of executors, number of cores, RAM allocated to executors, number of RDD partitions which correspond to the number of parallel threads operating on the RDD etc etc Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when the available RAM is exhausted due to high message rate and which crashes your (hence clogged up) application the name of the condition is: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-4-1410542878200 not found From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Monday, May 18, 2015 12:13 PM To: 'Dmitry Goldenberg'; 'Akhil Das' Cc: 'user@spark.apache.org' Subject: RE: Spark Streaming and reducing latency You can use spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications in the Spark Streaming programing guide for mode details. Another way is to implement a feedback loop in your receivers monitoring the performance metrics of your application/job and based on that adjusting automatically the receiving rate – BUT all these have nothing to do with “reducing the latency” – they simply prevent your application/job from clogging up – the nastier effect of which is when S[ark Streaming starts removing In Memory RDDs from RAM before they are processed by the job – that works fine in Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming when done in this “unceremonious way” it simply Crashes the application From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] Sent: Monday, May 18, 2015 11:46 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Spark Streaming and reducing latency Thanks, Akhil. So what do folks typically do to increase/contract the capacity? Do you plug in some cluster auto-scaling solution to make this elastic? Does Spark have any hooks for instrumenting auto-scaling? In other words, how do you avoid overwheling the receivers in a scenario when your system's input can be unpredictable, based on users' activity? On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: With receiver based streaming, you can actually specify spark.streaming.blockInterval which is the interval at which the receiver will fetch data from the source. Default value is 200ms and hence if your batch duration is 1 second, it will produce 5 blocks of data. And yes, with sparkstreaming when your processing time goes beyond your batch duration and you are having a higher data consumption then you will overwhelm the receiver's memory and hence will throw up block not found exceptions. Thanks Best Regards On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote: I keep hearing the argument that the way Discretized Streams work with Spark Streaming is a lot more of a batch processing algorithm than true streaming. For streaming, one would expect a new item, e.g. in a Kafka topic, to be available to the streaming consumer immediately. With the discretized streams, streaming is done with batch intervals i.e. the consumer has to wait the interval to be able to get at the new items. If one wants to reduce latency it seems the only way to do this would be by reducing the batch interval window. However, that may lead to a great deal of churn, with many requests going into Kafka out of the consumers, potentially with no results whatsoever as there's nothing new in the topic at the moment. Is there a counter-argument to this reasoning? What are some of the general approaches to reduce latency folks might recommend? Or, perhaps there are ways of dealing with