PySpark Structured Streaming - using previous iteration computed results in current iteration
We would like to utilize maintaining an arbitrary state between invokations of the iterations of StructuredStreaming in python How can we maintain a static DataFrame that acts as state between the iterations? Several options that may be relevant: 1. in Spark memory (distributed across the workers) 2. External Memory solution (e.g. ElasticSearch / Redis) 3. utilizing other state maintenance that can work with PySpark Specifically - given that in iteration N we get a Streaming DataFrame from Kafka, we apply computation that produces a label column over the window of samples from the last hour. We want to keep around the labels and the sample ids for the next iteration (N+1) where we want to do a join with the new sample window to inherit the labels of samples that existed in the previous (N) iteration. -- Regards, Ofer Eliassaf
Re: pyspark cluster mode on standalone deployment
anyone? please? is this getting any priority? On Tue, Sep 27, 2016 at 3:38 PM, Ofer Eliassaf <ofer.elias...@gmail.com> wrote: > Is there any plan to support python spark running in "cluster mode" on a > standalone deployment? > > There is this famous survey mentioning that more than 50% of the users are > using the standalone configuration. > Making pyspark work in cluster mode with standalone will help a lot for > high availabilty in python spark. > > Cuurently only Yarn deployment supports it. Bringing the huge Yarn > installation just for this feature is not fun at all > > Does someone have time estimation for this? > > > > -- > Regards, > Ofer Eliassaf > -- Regards, Ofer Eliassaf
Re: PySpark TaskContext
thank u so much for this! Great to see that u listen to the community. On Thu, Nov 24, 2016 at 12:10 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > https://issues.apache.org/jira/browse/SPARK-18576 > > On Thu, Nov 24, 2016 at 2:05 AM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> Cool - thanks. I'll circle back with the JIRA number once I've got it >> created - will probably take awhile before it lands in a Spark release >> (since 2.1 has already branched) but better debugging information for >> Python users is certainly important/useful. >> >> On Thu, Nov 24, 2016 at 2:03 AM, Ofer Eliassaf <ofer.elias...@gmail.com> >> wrote: >> >>> Since we can't work with log4j in pyspark executors we build our own >>> logging infrastructure (based on logstash/elastic/kibana). >>> Would help to have TID in the logs, so we can drill down accordingly. >>> >>> >>> On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> Hi, >>>> >>>> The TaskContext isn't currently exposed in PySpark but I've been >>>> meaning to look at exposing at least some of TaskContext for parity in >>>> PySpark. Is there a particular use case which you want this for? Would help >>>> with crafting the JIRA :) >>>> >>>> Cheers, >>>> >>>> Holden :) >>>> >>>> On Thu, Nov 24, 2016 at 1:39 AM, ofer <ofer.elias...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> Is there a way to get in PYSPARK something like TaskContext from a code >>>>> running on executor like in scala spark? >>>>> >>>>> If not - how can i know my task id from inside the executors? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: http://apache-spark-user-list. >>>>> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> - >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>>>> >>>> >>>> >>>> -- >>>> Cell : 425-233-8271 >>>> Twitter: https://twitter.com/holdenkarau >>>> >>> >>> >>> >>> -- >>> Regards, >>> Ofer Eliassaf >>> >> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau >> > > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > -- Regards, Ofer Eliassaf
Re: PySpark TaskContext
Since we can't work with log4j in pyspark executors we build our own logging infrastructure (based on logstash/elastic/kibana). Would help to have TID in the logs, so we can drill down accordingly. On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau <hol...@pigscanfly.ca> wrote: > Hi, > > The TaskContext isn't currently exposed in PySpark but I've been meaning > to look at exposing at least some of TaskContext for parity in PySpark. Is > there a particular use case which you want this for? Would help with > crafting the JIRA :) > > Cheers, > > Holden :) > > On Thu, Nov 24, 2016 at 1:39 AM, ofer <ofer.elias...@gmail.com> wrote: > >> Hi, >> Is there a way to get in PYSPARK something like TaskContext from a code >> running on executor like in scala spark? >> >> If not - how can i know my task id from inside the executors? >> >> Thanks! >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > -- Regards, Ofer Eliassaf
PySpark TaskContext
Hi, Is there a way to get in PYSPARK something like TaskContext from a code running on executor like in scala spark? If not - how can i know my task id from inside the executors? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Dynamic Resource Allocation in a standalone
Hi, I have a question/problem regarding dynamic resource allocation. I am using spark 1.6.2 with stand alone cluster manager. I have one worker with 2 cores. I set the the folllowing arguments in the spark-defaults.conf file on all my nodes: spark.dynamicAllocation.enabled true spark.shuffle.service.enabled true spark.deploy.defaultCores 1 I run a sample application with many tasks. I open port 4040 on the driver and i can verify that the above configuration exists. My problem is that no matter what i do my application only gets 1 core even though the other cores are available. Is this normal or do i have a problem in my configuration. The behaviour i want to get is this: I have many users working with the same spark cluster. I want that each application will get a fixed number of cores unless the rest of the clutser is pending. In this case I want that the runn ing applications will get the total amount of cores until a new application arrives... -- Regards, Ofer Eliassaf
Re: spark standalone with multiple workers gives a warning
The slaves should connect to the master using the scripts in sbin... You can read about it here: http://spark.apache.org/docs/latest/spark-standalone.html On Thu, Oct 6, 2016 at 6:46 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > Hi, > > I have a spark standalone cluster. On it, I am using 3 workers per node. > > So I added SPARK_WORKER_INSTANCES set to 3 in spark-env.sh > > The problem is, that when I run spark-shell I get the following warning: > > WARN SparkConf: > > SPARK_WORKER_INSTANCES was detected (set to '3'). > > This is deprecated in Spark 1.0+. > > > > Please instead use: > > - ./spark-submit with --num-executors to specify the number of executors > > - Or set SPARK_EXECUTOR_INSTANCES > > - spark.executor.instances to configure the number of instances in the > spark config. > > > > So how would I start a cluster of 3? SPARK_WORKER_INSTANCES is the only > way I see to start the standalone cluster and the only way I see to define > it is in spark-env.sh. The spark submit option, SPARK_EXECUTOR_INSTANCES > and spark.executor.instances are all related to submitting the job. > > > > Any ideas? > > Thanks > > Assaf > -- Regards, Ofer Eliassaf
Re: Pyspark not working on yarn-cluster mode
I advice you to use livy for this purpose. Livy works well with yarn and it will decouple spark from your web app. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755p27799.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
pyspark cluster mode on standalone deployment
Is there any plan to support python spark running in "cluster mode" on a standalone deployment? There is this famous survey mentioning that more than 50% of the users are using the standalone configuration. Making pyspark work in cluster mode with standalone will help a lot for high availabilty in python spark. Cuurently only Yarn deployment supports it. Bringing the huge Yarn installation just for this feature is not fun at all Does someone have time estimation for this? -- Regards, Ofer Eliassaf
Re: Strange behavior with PySpark when using Join() and zip()
Thanks Sean, Sorting definitely solves it, but I was hoping it could be avoided :) In the documentation for Classification in ML-Lib for example, zip() is used to create labelsAndPredictions: - from pyspark.mllib.tree import RandomForest from pyspark.mllib.util import MLUtils # Load and parse the data file into an RDD of LabeledPoint. data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a RandomForest model. # Empty categoricalFeaturesInfo indicates all features are continuous. # Note: Use larger numTrees in practice. # Setting featureSubsetStrategy=auto lets the algorithm choose. model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy=auto”, impurity='gini', maxDepth=4, maxBins=32) # Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count()) print('Test Error = ' + str(testErr)) print('Learned classification forest model:') print(model.toDebugString()) The reason the zip() works here is because the RDD is loaded from a file. If it was generated with something that includes a JOIN() it won’t work due to this same issue. Maybe worth mentioning in the docs then? Ofer On Mar 23, 2015, at 11:40 AM, Sean Owen so...@cloudera.com wrote: I think the explanation is that the join does not guarantee any order, since it causes a shuffle in general, and it is computed twice in the first example, resulting in a difference for d1 and d2. You can persist() the result of the join and in practice I believe you'd find it behaves as expected, although that is even not 100% guaranteed since a block could be lost and recomputed (differently). If order matters, and it does for zip(), then the reliable way to guarantee a well defined ordering for zipping is to sort the RDDs. On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch omendelevi...@hortonworks.com wrote: Hi, I am running into a strange issue when doing a JOIN of two RDDs followed by ZIP from PySpark. It’s part of a more complex application, but was able to narrow it down to a simplified example that’s easy to replicate and causes the same problem to appear: raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)]) data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair: ','.join([x for x in pair[1]])) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81', 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'), ('v83', 'v83')] As you can see, the ordering of items is not preserved anymore in all cases. (e.g., ‘v81’ is preserved, and ‘v45’ is not) Is it not supposed to be preserved? If I do the same thing without the JOIN: data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100)) d1 = data.map(lambda s: s.split(',')[0]) d2 = data.map(lambda s: s.split(',')[1]) x = d1.zip(d2) print x.take(10) The output is: [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'), ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')] As expected. Anyone run into this or a similar issue? Ofer