PySpark Structured Streaming - using previous iteration computed results in current iteration

2018-05-16 Thread Ofer Eliassaf
We would like to utilize maintaining an arbitrary state between invokations
of the iterations of StructuredStreaming in python

How can we maintain a static DataFrame that acts as state between the
iterations?

Several options that may be relevant:
1. in Spark memory (distributed across the workers)
2. External Memory solution (e.g. ElasticSearch / Redis)
3. utilizing other state maintenance that can work with PySpark

Specifically - given that in iteration N we get a Streaming DataFrame from
Kafka, we apply computation that produces a label column over the window of
samples from the last hour.
We want to keep around the labels and the sample ids for the next iteration
(N+1) where we want to do a join with the new sample window to inherit the
labels of samples that existed in the previous (N) iteration.


-- 
Regards,
Ofer Eliassaf


Re: pyspark cluster mode on standalone deployment

2017-03-05 Thread Ofer Eliassaf
anyone? please? is this getting any priority?

On Tue, Sep 27, 2016 at 3:38 PM, Ofer Eliassaf <ofer.elias...@gmail.com>
wrote:

> Is there any plan to support python spark running in "cluster mode" on a
> standalone deployment?
>
> There is this famous survey mentioning that more than 50% of the users are
> using the standalone configuration.
> Making pyspark work in cluster mode with standalone will help a lot for
> high availabilty in python spark.
>
> Cuurently only Yarn deployment supports it. Bringing the huge Yarn
> installation just for this feature is not fun at all
>
> Does someone have time estimation for this?
>
>
>
> --
> Regards,
> Ofer Eliassaf
>



-- 
Regards,
Ofer Eliassaf


Re: PySpark TaskContext

2016-11-24 Thread Ofer Eliassaf
thank u so much for this! Great to see that u listen to the community.

On Thu, Nov 24, 2016 at 12:10 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> https://issues.apache.org/jira/browse/SPARK-18576
>
> On Thu, Nov 24, 2016 at 2:05 AM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> Cool - thanks. I'll circle back with the JIRA number once I've got it
>> created - will probably take awhile before it lands in a Spark release
>> (since 2.1 has already branched) but better debugging information for
>> Python users is certainly important/useful.
>>
>> On Thu, Nov 24, 2016 at 2:03 AM, Ofer Eliassaf <ofer.elias...@gmail.com>
>> wrote:
>>
>>> Since we can't work with log4j in pyspark executors we build our own
>>> logging infrastructure (based on logstash/elastic/kibana).
>>> Would help to have TID in the logs, so we can drill down accordingly.
>>>
>>>
>>> On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> The TaskContext isn't currently exposed in PySpark but I've been
>>>> meaning to look at exposing at least some of TaskContext for parity in
>>>> PySpark. Is there a particular use case which you want this for? Would help
>>>> with crafting the JIRA :)
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>> On Thu, Nov 24, 2016 at 1:39 AM, ofer <ofer.elias...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> Is there a way to get in PYSPARK something like TaskContext from a code
>>>>> running on executor like in scala spark?
>>>>>
>>>>> If not - how can i know my task id from inside the executors?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Ofer Eliassaf
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Regards,
Ofer Eliassaf


Re: PySpark TaskContext

2016-11-24 Thread Ofer Eliassaf
Since we can't work with log4j in pyspark executors we build our own
logging infrastructure (based on logstash/elastic/kibana).
Would help to have TID in the logs, so we can drill down accordingly.


On Thu, Nov 24, 2016 at 11:48 AM, Holden Karau <hol...@pigscanfly.ca> wrote:

> Hi,
>
> The TaskContext isn't currently exposed in PySpark but I've been meaning
> to look at exposing at least some of TaskContext for parity in PySpark. Is
> there a particular use case which you want this for? Would help with
> crafting the JIRA :)
>
> Cheers,
>
> Holden :)
>
> On Thu, Nov 24, 2016 at 1:39 AM, ofer <ofer.elias...@gmail.com> wrote:
>
>> Hi,
>> Is there a way to get in PYSPARK something like TaskContext from a code
>> running on executor like in scala spark?
>>
>> If not - how can i know my task id from inside the executors?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>



-- 
Regards,
Ofer Eliassaf


PySpark TaskContext

2016-11-24 Thread ofer
Hi,
Is there a way to get in PYSPARK something like TaskContext from a code
running on executor like in scala spark?

If not - how can i know my task id from inside the executors?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-TaskContext-tp28125.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Dynamic Resource Allocation in a standalone

2016-10-27 Thread Ofer Eliassaf
Hi,

I have a question/problem regarding dynamic resource allocation.
I am using spark 1.6.2 with stand alone cluster manager.

I have one worker with 2 cores.

I set the the folllowing arguments in the spark-defaults.conf file on all
my nodes:

spark.dynamicAllocation.enabled  true
spark.shuffle.service.enabled true
spark.deploy.defaultCores 1

I run a sample application with many tasks.

I open port 4040 on the driver and i can verify that the above
configuration exists.

My problem is that no matter what i do my application only gets 1 core even
though the other cores are available.

Is this normal or do i have a problem in my configuration.


The behaviour i want to get is this:
I have many users working with the same spark cluster.
I want that each application will get a fixed number of cores unless the
rest of the clutser is pending.
In this case I want that the runn ing applications will get the total
amount of cores until a new application arrives...


-- 
Regards,
Ofer Eliassaf


Re: spark standalone with multiple workers gives a warning

2016-10-06 Thread Ofer Eliassaf
The slaves should connect to the master using the scripts in sbin...
You can read about it here:
http://spark.apache.org/docs/latest/spark-standalone.html

On Thu, Oct 6, 2016 at 6:46 PM, Mendelson, Assaf <assaf.mendel...@rsa.com>
wrote:

> Hi,
>
> I have a spark standalone cluster. On it, I am using 3 workers per node.
>
> So I added SPARK_WORKER_INSTANCES set to 3 in spark-env.sh
>
> The problem is, that when I run spark-shell I get the following warning:
>
> WARN SparkConf:
>
> SPARK_WORKER_INSTANCES was detected (set to '3').
>
> This is deprecated in Spark 1.0+.
>
>
>
> Please instead use:
>
> - ./spark-submit with --num-executors to specify the number of executors
>
> - Or set SPARK_EXECUTOR_INSTANCES
>
> - spark.executor.instances to configure the number of instances in the
> spark config.
>
>
>
> So how would I start a cluster of 3? SPARK_WORKER_INSTANCES is the only
> way I see to start the standalone cluster and the only way I see to define
> it is in spark-env.sh. The spark submit option, SPARK_EXECUTOR_INSTANCES
> and spark.executor.instances are all related to submitting the job.
>
>
>
> Any ideas?
>
> Thanks
>
> Assaf
>



-- 
Regards,
Ofer Eliassaf


Re: Pyspark not working on yarn-cluster mode

2016-09-27 Thread ofer
I advice you to use livy for this purpose.
Livy works well with yarn and it will decouple spark from your web app.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755p27799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pyspark cluster mode on standalone deployment

2016-09-27 Thread Ofer Eliassaf
Is there any plan to support python spark running in "cluster mode" on a
standalone deployment?

There is this famous survey mentioning that more than 50% of the users are
using the standalone configuration.
Making pyspark work in cluster mode with standalone will help a lot for
high availabilty in python spark.

Cuurently only Yarn deployment supports it. Bringing the huge Yarn
installation just for this feature is not fun at all

Does someone have time estimation for this?



-- 
Regards,
Ofer Eliassaf


Re: Strange behavior with PySpark when using Join() and zip()

2015-03-23 Thread Ofer Mendelevitch
Thanks Sean,

Sorting definitely solves it, but I was hoping it could be avoided :)

In the documentation for Classification in ML-Lib for example, zip() is used to 
create labelsAndPredictions:

-
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
#  Note: Use larger numTrees in practice.
#  Setting featureSubsetStrategy=auto lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, 
categoricalFeaturesInfo={}, numTrees=3, featureSubsetStrategy=auto”, 
impurity='gini', maxDepth=4, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / 
float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())



The reason the zip() works here is because the RDD is loaded from a file.
If it was generated with something that includes a JOIN() it won’t work due to 
this same issue.

Maybe worth mentioning in the docs then?

Ofer



 On Mar 23, 2015, at 11:40 AM, Sean Owen so...@cloudera.com wrote:
 
 I think the explanation is that the join does not guarantee any order,
 since it causes a shuffle in general, and it is computed twice in the
 first example, resulting in a difference for d1 and d2.
 
 You can persist() the result of the join and in practice I believe
 you'd find it behaves as expected, although that is even not 100%
 guaranteed since a block could be lost and recomputed (differently).
 
 If order matters, and it does for zip(), then the reliable way to
 guarantee a well defined ordering for zipping is to sort the RDDs.
 
 On Mon, Mar 23, 2015 at 6:27 PM, Ofer Mendelevitch
 omendelevi...@hortonworks.com wrote:
 Hi,
 
 I am running into a strange issue when doing a JOIN of two RDDs followed by
 ZIP from PySpark.
 It’s part of a more complex application, but was able to narrow it down to a
 simplified example that’s easy to replicate and causes the same problem to
 appear:
 
 
 raw = sc.parallelize([('k'+str(x),'v'+str(x)) for x in range(100)])
 data = raw.join(raw).mapValues(lambda x: [x[0]]+[x[1]]).map(lambda pair:
 ','.join([x for x in pair[1]]))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)
 
 print x.take(10)
 
 
 The output is:
 
 
 [('v44', 'v80'), ('v79', 'v44'), ('v80', 'v79'), ('v45', 'v78'), ('v81',
 'v81'), ('v78', 'v45'), ('v99', 'v99'), ('v82', 'v82'), ('v46', 'v46'),
 ('v83', 'v83')]
 
 
 As you can see, the ordering of items is not preserved anymore in all cases.
 (e.g., ‘v81’ is preserved, and ‘v45’ is not)
 Is it not supposed to be preserved?
 
 If I do the same thing without the JOIN:
 
 data = sc.parallelize('v'+str(x)+',v'+str(x) for x in range(100))
 d1 = data.map(lambda s: s.split(',')[0])
 d2 = data.map(lambda s: s.split(',')[1])
 x = d1.zip(d2)
 
 print x.take(10)
 
 The output is:
 
 
 [('v0', 'v0'), ('v1', 'v1'), ('v2', 'v2'), ('v3', 'v3'), ('v4', 'v4'),
 ('v5', 'v5'), ('v6', 'v6'), ('v7', 'v7'), ('v8', 'v8'), ('v9', 'v9')]
 
 
 As expected.
 
 Anyone run into this or a similar issue?
 
 Ofer