Re: Maximum executors in EC2 Machine

2023-10-24 Thread Riccardo Ferrari
Hi,

I would refer to their documentation to better understand the concepts
behind cluster overview and submitting applications:

   -
   
https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types
   - https://spark.apache.org/docs/latest/submitting-applications.html

When using local[*]  you can get as many worker threads as your cores  in
the same jvm running your driver and not executors. If you want to test
against a real cluster you can look into using stand-alone mode.

HTH,
Riccardo

On Mon, Oct 23, 2023 at 5:31 PM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running a spark job in spark EC2 machine whiich has 40 cores. Driver
> and executor memory is 16 GB. I am using local[*] but I still get only one
> executor(driver). Is there a way to get more executors with this config.
>
> I am not using yarn or mesos in this case. Only one machine which is
> enough for our work load but the data is increased.
>
> Thanks,
> Asmath
>


Re: Pyspark How to groupBy -> fit

2021-01-21 Thread Riccardo Ferrari
Thanks for the answers.

I am trying to avoid reading the same data multiple times (each per model).

One approach I can think of is 'filtering' on the column I want to split on
and train each model. I was hoping to find a more elegant approach.



On Thu, Jan 21, 2021 at 5:28 PM Sean Owen  wrote:

> If you mean you want to train N models in parallel, you wouldn't be able
> to do that with a groupBy first. You apply logic to the result of groupBy
> with Spark, but can't use Spark within Spark. You can run N Spark jobs in
> parallel on the driver but you'd have to have each read the subset of data
> that it's meant to model separately.
>
> A pandas UDF is a fine solution here, because I assume that implies your
> groups aren't that big, so, maybe no need for a Spark pipeline.
>
>
> On Thu, Jan 21, 2021 at 9:20 AM Riccardo Ferrari 
> wrote:
>
>> Hi list,
>>
>> I am looking for an efficient solution to apply a training pipeline to
>> each group of a DataFrame.groupBy.
>>
>> This is very easy if you're using a pandas udf (i.e. groupBy().apply()),
>> I am not able to find the equivalent for a spark pipeline.
>>
>> The ultimate goal is to fit multiple models, one per group of data.
>>
>> Thanks,
>>
>>


Pyspark How to groupBy -> fit

2021-01-21 Thread Riccardo Ferrari
Hi list,

I am looking for an efficient solution to apply a training pipeline to each
group of a DataFrame.groupBy.

This is very easy if you're using a pandas udf (i.e. groupBy().apply()), I
am not able to find the equivalent for a spark pipeline.

The ultimate goal is to fit multiple models, one per group of data.

Thanks,


Re: PyCharm, Running spark-submit calling jars and a package at run time

2021-01-08 Thread Riccardo Ferrari
I think spark checks the python path env variable. Need to provide that.
Of course that works in local mode only

On Fri, Jan 8, 2021, 5:28 PM Sean Owen  wrote:

> I don't see anywhere that you provide 'sparkstuff'? how would the Spark
> app have this code otherwise?
>
> On Fri, Jan 8, 2021 at 10:20 AM Mich Talebzadeh 
> wrote:
>
>> Thanks Riccardo.
>>
>> I am well aware of the submission form
>>
>> However, my question relates to doing submission within PyCharm itself.
>>
>> This is what I do at Pycharm *terminal* to invoke the module python
>>
>> spark-submit --jars
>> ..\lib\spark-bigquery-with-dependencies_2.12-0.18.0.jar \
>>  --packages com.github.samelamin:spark-bigquery_2.11:0.2.6
>> analyze_house_prices_GCP.py
>>
>> However, at terminal run it does not pickup import dependencies in the
>> code!
>>
>> Traceback (most recent call last):
>>   File
>> "C:/Users/admin/PycharmProjects/pythonProject2/DS/src/analyze_house_prices_GCP.py",
>> line 8, in 
>> import sparkstuff as s
>> ModuleNotFoundError: No module named 'sparkstuff'
>>
>> The python code is attached, pretty simple
>>
>> Thanks
>>
>>
>>
>>


Re: PyCharm, Running spark-submit calling jars and a package at run time

2021-01-08 Thread Riccardo Ferrari
You need to provide your python dependencies as well. See
http://spark.apache.org/docs/latest/submitting-applications.html, look for
--py-files

HTH

On Fri, Jan 8, 2021 at 3:13 PM Mich Talebzadeh 
wrote:

> Hi,
>
> I have a module in Pycharm which reads data stored in a Bigquery table and
> does plotting.
>
> At the command line on the terminal I need to add the jar file and the
> packet to make it work.
>
> (venv) C:\Users\admin\PycharmProjects\pythonProject2\DS\src>spark-submit
> --jars ..\lib\spark-bigquery-with-dependencies_2.12-0.18.0.jar
> analyze_house_prices
>
> _GCP.py
>
> This works but the problem is that the imports into the module are not
> picked up.  Example
>
>
> import sparkstuff as s
>
>
> This is picked up when run within Pycharm itself but not at the command
> line!
>
>
> (venv) C:\Users\admin\PycharmProjects\pythonProject2\DS\src>spark-submit
> --jars ..\lib\spark-bigquery-with-dependencies_2.12-0.18.0.jar
> analyze_house_prices
>
> _GCP.py
>
> Traceback (most recent call last):
>
>   File
> "C:/Users/admin/PycharmProjects/pythonProject2/DS/src/analyze_house_prices_GCP.py",
> line 8, in 
>
> import sparkstuff as s
>
> ModuleNotFoundError: No module named 'sparkstuff'
>
> The easiest option would be to run all these within PyCharm itself
> invoking the jar file and package at runtime.
>
> Otherwise I can run it at the command line but being able to resolve
> imports. I appreciate any work-around this.
>
> Thanks
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Compute the Hash of each row in new column

2020-02-28 Thread Riccardo Ferrari
Hi Chetan,

Would the sql function `hash` do the trick for your use-case ?

Best,

On Fri, Feb 28, 2020 at 1:56 PM Chetan Khatri 
wrote:

> Hi Spark Users,
> How can I compute Hash of each row and store in new column at Dataframe,
> could someone help me.
>
> Thanks
>


Cluster sizing

2019-09-13 Thread Riccardo Ferrari
Hi list,

Is there any documentation about how to approach cluster sizing. How do you
approach a new deployment?

Thanks,


Re: spark standalone mode problem about executor add and removed again and again!

2019-07-18 Thread Riccardo Ferrari
I would also check firewall rules. Is communication allowed on all the
required port ranges and hosts ?

On Thu, Jul 18, 2019 at 3:56 AM Amit Sharma  wrote:

> Do you have dynamic resource allocation enabled?
>
>
> On Wednesday, July 17, 2019, zenglong chen 
> wrote:
>
>> Hi,all,
>> My standalone mode has two slaves.When I submit my job,the
>> localhost slave is working well,but second slave do add and remove executor
>> action always!The log are below:
>>2019-07-17 10:51:38,889 INFO
>> client.StandaloneAppClient$ClientEndpoint: Executor updated:
>> app-20190717105135-0008/2 is now EXITED (Command exited with code 1)
>> 2019-07-17 10:51:38,890 INFO cluster.StandaloneSchedulerBackend: Executor
>> app-20190717105135-0008/2 removed: Command exited with code 1
>> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMasterEndpoint: Trying
>> to remove executor 2 from BlockManagerMaster.
>> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMaster: Removal of
>> executor 2 requested
>> 2019-07-17 10:51:38,891 INFO
>> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove
>> non-existent executor 2
>> 2019-07-17 10:51:38,892 INFO client.StandaloneAppClient$ClientEndpoint:
>> Executor added: app-20190717105135-0008/3 on
>> worker-20190717093045-172.22.9.179-40573 (172.22.9.179:40573) with 8
>> core(s)
>> 2019-07-17 10:51:38,892 INFO cluster.StandaloneSchedulerBackend: Granted
>> executor ID app-20190717105135-0008/3 on hostPort 172.22.9.179:40573
>> with 8 core(s), 12.0 GB RAM
>> 2019-07-17 10:51:38,893 INFO client.StandaloneAppClient$ClientEndpoint:
>> Executor updated: app-20190717105135-0008/3 is now RUNNING
>> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
>> Executor updated: app-20190717105135-0008/3 is now EXITED (Command exited
>> with code 1)
>> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Executor
>> app-20190717105135-0008/3 removed: Command exited with code 1
>> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
>> Executor added: app-20190717105135-0008/4 on
>> worker-20190717093045-172.22.9.179-40573 (172.22.9.179:40573) with 8
>> core(s)
>> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMaster: Removal of
>> executor 3 requested
>> 2019-07-17 10:51:40,521 INFO
>> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove
>> non-existent executor 3
>> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMasterEndpoint: Trying
>> to remove executor 3 from BlockManagerMaster.
>> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Granted
>> executor ID app-20190717105135-0008/4 on hostPort 172.22.9.179:40573
>> with 8 core(s), 12.0 GB RAM
>> 2019-07-17 10:51:40,523 INFO client.StandaloneAppClient$ClientEndpoint:
>> Executor updated: app-20190717105135-0008/4 is now RUNNING
>>
>>
>> And the slave output are below:
>>19/07/17 10:47:12 INFO ExecutorRunner: Launch command:
>> "/home/ubuntu/data/jdk/jre/bin/java" "-cp"
>> "/home/ubuntu/spark-2.4.3/conf/:/home/ubuntu/spark-2.4.3/jars/*"
>> "-Xmx12288M" "-Dspark.driver.port=40335"
>> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
>> "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
>> "--executor-id" "18" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
>> "app-20190717104645-0007" "--worker-url" "spark://
>> Worker@172.22.9.179:40573"
>> 19/07/17 10:47:13 INFO Worker: Executor app-20190717104645-0007/18
>> finished with state EXITED message Command exited with code 1 exitStatus 1
>> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Clean up non-shuffle
>> files associated with the finished executor 18
>> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Executor is not
>> registered (appId=app-20190717104645-0007, execId=18)
>> 19/07/17 10:47:13 INFO Worker: Asked to launch executor
>> app-20190717104645-0007/19 for ph_user_pre_level
>> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls to: ubuntu
>> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls to: ubuntu
>> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls groups to:
>> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls groups to:
>> 19/07/17 10:47:13 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users  with view permissions: Set(ubuntu);
>> groups with view permissions: Set(); users  with modify permissions:
>> Set(ubuntu); groups with modify permissions: Set()
>> 19/07/17 10:47:14 INFO ExecutorRunner: Launch command:
>> "/home/ubuntu/data/jdk/jre/bin/java" "-cp"
>> "/home/ubuntu/spark-2.4.3/conf/:/home/ubuntu/spark-2.4.3/jars/*"
>> "-Xmx12288M" "-Dspark.driver.port=40335"
>> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
>> "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
>> "--executor-id" "19" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
>> "app-20190717104645-0007" "--worker-url" "spark://
>> 

Re: best docker image to use

2019-06-11 Thread Riccardo Ferrari
Hi Marcelo,

I'm used to work with https://github.com/jupyter/docker-stacks. There's the
Scala+jupyter option too. Though there might be better option with Zeppelin
too.
Hth


On Tue, 11 Jun 2019, 11:52 Marcelo Valle,  wrote:

> Hi,
>
> I would like to run spark shell + scala on a docker environment, just to
> play with docker in development machine without having to install JVM + a
> lot of things.
>
> Is there something as an "official docker image" I am recommended to use?
> I saw some on docker hub, but it seems they are all contributions from
> pro-active individuals. I wonder whether the group maintaining Apache Spark
> also maintains some docker images for use cases like this?
>
> Thanks,
> Marcelo.
>
> This email is confidential [and may be protected by legal privilege]. If
> you are not the intended recipient, please do not copy or disclose its
> content but contact the sender immediately upon receipt.
>
> KTech Services Ltd is registered in England as company number 10704940.
>
> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
> United Kingdom
>


Re: Spark standalone and Pandas UDF from custom archive

2019-05-25 Thread Riccardo Ferrari
Thanks for the answer.

Are you saying that your jobs.py is actually a zip file with all your code
in (just like I do)? That is the part it is failing on my side. If I add my
archive with .py extension it is ignored. I have to rename it to zip in
order to have it avaialbe on my executors.
So is it your driver.py responsible for loading the jobs.py file contents,
is it?



On Sat, May 25, 2019 at 12:00 PM  wrote:

> Hi,
>
> The solution we have is to make a generic spark submit python file
> (driver.py). This is just a main method which takes a single parameter- the
> module containing the app you want to run.  The main method itself just
> dynamically loads the module and executes some well know method on it (we
> use run). As such it is basically capable of running any pyspark app so
> long as the required module is available.
>
> We then package all our job modules and associated library code into a
> single zip file (jobs.py). You can pass this zip file to spark submit via
> pyfiles and use the generic launcher as the driver:
>
> Spark-submit —py-files jobs.py driver.py job-module
>
>
>
>
>
> On 25 May 2019, at 10:02, Riccardo Ferrari  wrote:
>
> Following up to my previous message, it turns out the issue is a mix
> between packaging and spark launcher mechanics. Everything can be
> summarized as follow
>
>- My app is packaged as 'fat-jar' alike. I'm using zipapp to prepare
>the self executable archive (with all the needed descriptors)
>- IF the AppResource does not end with 'py', the launcher believes it
>is a JVM application.
>   - I am not allowed to use the '.pyz' or '.zip' extension
>- IF 'py-files' is set to add a '.py' file it expects a py file and
>not py-zip archive
>   - I have no direct evidence of this sentence. Fact is if I add the
>   same package with '.zip' everything starts working
>
> So at the moment I was able to run my app by shipping the same file with
> two different extensions, '.py' to make the AppResource happy and 'zip' to
> make the the py-files work.It is what it is, an akward workaournd.
>
> Can someone share an approach to reach the following goals:
>
>- submit complex python applications (multiple classes)
>- without installing them on every node of the standalone cluster
>    - possibly without requiring any external tool (jobserver/livy/...)
>
> Best,
>
> On Fri, May 24, 2019 at 10:24 PM Riccardo Ferrari 
> wrote:
>
>> Hi list,
>>
>> I am having an issue ditributing a pandas_udf to my workers.
>> I'm using Spark 2.4.1 in standalone mode.
>> *Submit*:
>>
>>- via SparkLauncher as separate process. I do add the py-files with
>>the self-executable zip (with .py extension) before launching the
>>application.
>>- The whole application (few modules) is zippped into a single zip
>>file with 'py' extension
>>- The app defines a GROUPED_MAP pandas_udf that instanciate an class
>>from within the module
>>
>> *Error*:
>> When I perform my df.gruopBy().agg(my_pandas_udf) my workers
>> errors out with:
>> 
>> 2019-05-24 19:40:18.577: Caused by:
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>> 2019-05-24 19:40:18.577:   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 366, in main
>> 2019-05-24 19:40:18.577: func, profiler, deserializer, serializer =
>> read_udfs(pickleSer, infile, eval_type)
>> 2019-05-24 19:40:18.577:   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in
>> read_udfs
>> 2019-05-24 19:40:18.577: arg_offsets, udf =
>> read_single_udf(pickleSer, infile, eval_type, runner_conf)
>> 2019-05-24 19:40:18.577:   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 168, in
>> read_single_udf
>> 2019-05-24 19:40:18.577: f, return_type = read_command(pickleSer,
>> infile)
>> 2019-05-24 19:40:18.577:   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
>> read_command
>> 2019-05-24 19:40:18.577: command = serializer._read_with_length(file)
>> 2019-05-24 19:40:18.577:   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in
>> _read_with_length
>> 2019-05-24 19:40:18.577: return self.loads(obj)
>> 2019-05-24 19:40:18.577:   File
>> "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in
>> loads
>> 2019-05-24 19:40:18.577: return pickle.loads(obj, encoding=encoding)
>> 2019-05-24 19:40:18.577: ModuleNotFoundError: No module n

Re: Spark standalone and Pandas UDF from custom archive

2019-05-25 Thread Riccardo Ferrari
Following up to my previous message, it turns out the issue is a mix
between packaging and spark launcher mechanics. Everything can be
summarized as follow

   - My app is packaged as 'fat-jar' alike. I'm using zipapp to prepare the
   self executable archive (with all the needed descriptors)
   - IF the AppResource does not end with 'py', the launcher believes it is
   a JVM application.
  - I am not allowed to use the '.pyz' or '.zip' extension
   - IF 'py-files' is set to add a '.py' file it expects a py file and not
   py-zip archive
  - I have no direct evidence of this sentence. Fact is if I add the
  same package with '.zip' everything starts working

So at the moment I was able to run my app by shipping the same file with
two different extensions, '.py' to make the AppResource happy and 'zip' to
make the the py-files work.It is what it is, an akward workaournd.

Can someone share an approach to reach the following goals:

   - submit complex python applications (multiple classes)
   - without installing them on every node of the standalone cluster
   - possibly without requiring any external tool (jobserver/livy/...)

Best,

On Fri, May 24, 2019 at 10:24 PM Riccardo Ferrari 
wrote:

> Hi list,
>
> I am having an issue ditributing a pandas_udf to my workers.
> I'm using Spark 2.4.1 in standalone mode.
> *Submit*:
>
>- via SparkLauncher as separate process. I do add the py-files with
>the self-executable zip (with .py extension) before launching the
>application.
>- The whole application (few modules) is zippped into a single zip
>file with 'py' extension
>- The app defines a GROUPED_MAP pandas_udf that instanciate an class
>from within the module
>
> *Error*:
> When I perform my df.gruopBy().agg(my_pandas_udf) my workers
> errors out with:
> 
> 2019-05-24 19:40:18.577: Caused by:
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
> 2019-05-24 19:40:18.577:   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 366, in main
> 2019-05-24 19:40:18.577: func, profiler, deserializer, serializer =
> read_udfs(pickleSer, infile, eval_type)
> 2019-05-24 19:40:18.577:   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in
> read_udfs
> 2019-05-24 19:40:18.577: arg_offsets, udf = read_single_udf(pickleSer,
> infile, eval_type, runner_conf)
> 2019-05-24 19:40:18.577:   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 168, in
> read_single_udf
> 2019-05-24 19:40:18.577: f, return_type = read_command(pickleSer,
> infile)
> 2019-05-24 19:40:18.577:   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
> read_command
> 2019-05-24 19:40:18.577: command = serializer._read_with_length(file)
> 2019-05-24 19:40:18.577:   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in
> _read_with_length
> 2019-05-24 19:40:18.577: return self.loads(obj)
> 2019-05-24 19:40:18.577:   File
> "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in
> loads
> 2019-05-24 19:40:18.577: return pickle.loads(obj, encoding=encoding)
> 2019-05-24 19:40:18.577: ModuleNotFoundError: No module named ''
> 2019-05-24 19:40:18.577:
> 2019-05-24 19:40:18.577: at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
> 
> where  is the name of my base package
> *My python application:*
> The structure is very simple
> my-application.py (actually a zip):
>
>- __mian__.py
>-- __init__.py
>   - 
>  -  __init__.py
>  - ...
>   - 
>  - __init__
>  - spark_job.py
>  - udf.py
>
> udf.py has a dummy class with just a  staticmethod definig the UDF
> spark_job.py: at some point defines the pandas_udf by calling:
>
> pandas_udf(f=UDFHelper.function, returnType=UDFHelper.get_schema(), 
> functionType=PandasUDFType.GROUPED_MAP)
>
> The error makes me think the executor don't have my archive in the
> PYTHONPATH however the 'addPyFile' should take care of it.
> I don't want to manually install my application everywhere (for env
> constraints)
>
> Any hint to troubleshoot or fix are really appreciated!
> Thanks,
>


Spark standalone and Pandas UDF from custom archive

2019-05-24 Thread Riccardo Ferrari
Hi list,

I am having an issue ditributing a pandas_udf to my workers.
I'm using Spark 2.4.1 in standalone mode.
*Submit*:

   - via SparkLauncher as separate process. I do add the py-files with the
   self-executable zip (with .py extension) before launching the application.
   - The whole application (few modules) is zippped into a single zip file
   with 'py' extension
   - The app defines a GROUPED_MAP pandas_udf that instanciate an class
   from within the module

*Error*:
When I perform my df.gruopBy().agg(my_pandas_udf) my workers
errors out with:

2019-05-24 19:40:18.577: Caused by:
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
2019-05-24 19:40:18.577:   File
"/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 366, in main
2019-05-24 19:40:18.577: func, profiler, deserializer, serializer =
read_udfs(pickleSer, infile, eval_type)
2019-05-24 19:40:18.577:   File
"/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in
read_udfs
2019-05-24 19:40:18.577: arg_offsets, udf = read_single_udf(pickleSer,
infile, eval_type, runner_conf)
2019-05-24 19:40:18.577:   File
"/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 168, in
read_single_udf
2019-05-24 19:40:18.577: f, return_type = read_command(pickleSer,
infile)
2019-05-24 19:40:18.577:   File
"/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
read_command
2019-05-24 19:40:18.577: command = serializer._read_with_length(file)
2019-05-24 19:40:18.577:   File
"/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in
_read_with_length
2019-05-24 19:40:18.577: return self.loads(obj)
2019-05-24 19:40:18.577:   File
"/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in
loads
2019-05-24 19:40:18.577: return pickle.loads(obj, encoding=encoding)
2019-05-24 19:40:18.577: ModuleNotFoundError: No module named ''
2019-05-24 19:40:18.577:
2019-05-24 19:40:18.577: at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)

where  is the name of my base package
*My python application:*
The structure is very simple
my-application.py (actually a zip):

   - __mian__.py
   - 
 -  __init__.py
 - ...
  - 
 - __init__
 - spark_job.py
 - udf.py

udf.py has a dummy class with just a  staticmethod definig the UDF
spark_job.py: at some point defines the pandas_udf by calling:

pandas_udf(f=UDFHelper.function, returnType=UDFHelper.get_schema(),
functionType=PandasUDFType.GROUPED_MAP)

The error makes me think the executor don't have my archive in the
PYTHONPATH however the 'addPyFile' should take care of it.
I don't want to manually install my application everywhere (for env
constraints)

Any hint to troubleshoot or fix are really appreciated!
Thanks,


Re: Deep Learning with Spark, what is your experience?

2019-05-05 Thread Riccardo Ferrari
Thanks everyone, I really appreciate your contributions here.

@Jason, thanks for the references I'll take a look. Quickly checking
github:
https://github.com/intel-analytics/analytics-zoo#distributed-tensorflow-and-keras-on-sparkbigdl
Do I understand correctly I can:

   - Prepare my data with Spark
   - Define a Tensorflow model
   - Train it in distributed fashion

When using the Keras API, is it the real Keras with just an adapter layer
or it si a completely different API that mimic Keras?

@Gurav, I agree that "you should pick the right tool for the job".

The purpose of this discussion is to understand/explore if we really need
another stack or we can leverage on the existing infrastructure and
expertise to accomplish the task.
We currently have some ML jobs and Spark proved to be the perfect fit for
us. We do know it enough to be confident we can deliver what is asked, it
scale, it is reslient, it works.

We are starting to evaluate/introduce some DL models, being able to
leverage on the existing infra it would be a big plus. It is not only
having to deal with a new set of machines running a different stack (ie
tensorflow, mxnet, ...) it is everything around it, tuning, managing,
packing applications, testing and so on. Are reasonable concerns?

Best,

On Sun, May 5, 2019 at 8:06 PM Gourav Sengupta 
wrote:

> If someone is trying to actually use deep learning algorithms, their focus
> should be in choosing the technology stack which gives them maximum
> flexibility to try the nuances of their algorithms.
>
> From a personal perspective, I always prefer to use libraries which
> provides the best flexibility and extensibility in terms of the science/
> mathematics of the subjects. For example try to open a book on Linear
> Regression and then try to see whether all the mathematical formulations
> are available in the SPARK module for regression or not.
>
> It is always better to choose a technology that fits into the nuances and
> perfection of the science, rather than choose a technology and then try to
> fit the science into it.
>
> Regards,
> Gourav
>
> On Sun, May 5, 2019 at 2:23 PM Jason Dai  wrote:
>
>> You may find talks from Analytics Zoo users at
>> https://analytics-zoo.github.io/master/#presentations/; in particular,
>> some of recent user examples on Analytics Zoo:
>>
>>- Mastercard:
>>
>> https://software.intel.com/en-us/articles/deep-learning-with-analytic-zoo-optimizes-mastercard-recommender-ai-service
>>
>>- Azure:
>>
>> https://software.intel.com/en-us/articles/use-analytics-zoo-to-inject-ai-into-customer-service-platforms-on-microsoft-azure-part-1
>>- CERN:
>>
>> https://db-blog.web.cern.ch/blog/luca-canali/machine-learning-pipelines-high-energy-physics-using-apache-spark-bigdl
>>- Midea/KUKA:
>>
>> https://software.intel.com/en-us/articles/industrial-inspection-platform-in-midea-and-kuka-using-distributed-tensorflow-on-analytics
>>- Talroo:
>>
>> https://software.intel.com/en-us/articles/talroo-uses-analytics-zoo-and-aws-to-leverage-deep-learning-for-job-recommendation
>>
>> <https://software.intel.com/en-us/articles/talroo-uses-analytics-zoo-and-aws-to-leverage-deep-learning-for-job-recommendations>
>>
>> Thanks,
>> -Jason
>>
>> On Sun, May 5, 2019 at 6:29 AM Riccardo Ferrari 
>> wrote:
>>
>>> Thank you for your answers!
>>>
>>> While it is clear each DL framework can solve the distributed model
>>> training on their own (some better than others).  Still I see a lot of
>>> value of having Spark on the ETL/pre-processing part, thus the origin of my
>>> question.
>>> I am trying to avoid to mange multiple stacks/workflows and hoping to
>>> unify my system. Projects like TensorflowOnSpark or Analytics-Zoo (to name
>>> couple) feels like they can help, still I really appreciate your comments
>>> and anyone that could add some value to this discussion. Does anyone have
>>> experience with them?
>>>
>>> Thanks
>>>
>>> On Sat, May 4, 2019 at 8:01 PM Pat Ferrel  wrote:
>>>
>>>> @Riccardo
>>>>
>>>> Spark does not do the DL learning part of the pipeline (afaik) so it is
>>>> limited to data ingestion and transforms (ETL). It therefore is optional
>>>> and other ETL options might be better for you.
>>>>
>>>> Most of the technologies @Gourav mentions have their own scaling based
>>>> on their own compute engines specialized for their DL implementations, so
>>>> be aware that Spark scaling has nothing to do with scaling most of the DL
>>>> engines

Re: Deep Learning with Spark, what is your experience?

2019-05-04 Thread Riccardo Ferrari
Thank you for your answers!

While it is clear each DL framework can solve the distributed model
training on their own (some better than others).  Still I see a lot of
value of having Spark on the ETL/pre-processing part, thus the origin of my
question.
I am trying to avoid to mange multiple stacks/workflows and hoping to unify
my system. Projects like TensorflowOnSpark or Analytics-Zoo (to name
couple) feels like they can help, still I really appreciate your comments
and anyone that could add some value to this discussion. Does anyone have
experience with them?

Thanks

On Sat, May 4, 2019 at 8:01 PM Pat Ferrel  wrote:

> @Riccardo
>
> Spark does not do the DL learning part of the pipeline (afaik) so it is
> limited to data ingestion and transforms (ETL). It therefore is optional
> and other ETL options might be better for you.
>
> Most of the technologies @Gourav mentions have their own scaling based on
> their own compute engines specialized for their DL implementations, so be
> aware that Spark scaling has nothing to do with scaling most of the DL
> engines, they have their own solutions.
>
> From: Gourav Sengupta 
> 
> Reply: Gourav Sengupta 
> 
> Date: May 4, 2019 at 10:24:29 AM
> To: Riccardo Ferrari  
> Cc: User  
> Subject:  Re: Deep Learning with Spark, what is your experience?
>
> Try using MxNet and Horovod directly as well (I think that MXNet is worth
> a try as well):
> 1.
> https://medium.com/apache-mxnet/distributed-training-using-apache-mxnet-with-horovod-44f98bf0e7b7
> 2.
> https://docs.nvidia.com/deeplearning/dgx/mxnet-release-notes/rel_19-01.html
> 3. https://aws.amazon.com/mxnet/
> 4.
> https://aws.amazon.com/blogs/machine-learning/aws-deep-learning-amis-now-include-horovod-for-faster-multi-gpu-tensorflow-training-on-amazon-ec2-p3-instances/
>
>
> Ofcourse Tensorflow is backed by Google's advertisement team as well
> https://aws.amazon.com/blogs/machine-learning/scalable-multi-node-training-with-tensorflow/
>
>
> Regards,
>
>
>
>
> On Sat, May 4, 2019 at 10:59 AM Riccardo Ferrari 
> wrote:
>
>> Hi list,
>>
>> I am trying to undestand if ti make sense to leverage on Spark as
>> enabling platform for Deep Learning.
>>
>> My open question to you are:
>>
>>- Do you use Apache Spark in you DL pipelines?
>>- How do you use Spark for DL? Is it just a stand-alone stage in the
>>workflow (ie data preparation script) or is it  more integrated
>>
>> I see a major advantage in leveraging on Spark as a unified entrypoint,
>> for example you can easily abstract data sources and leverage on existing
>> team skills for data pre-processing and training. On the flip side you may
>> hit some limitations including supported versions and so on.
>> What is your experience?
>>
>> Thanks!
>>
>


Deep Learning with Spark, what is your experience?

2019-05-04 Thread Riccardo Ferrari
Hi list,

I am trying to undestand if ti make sense to leverage on Spark as enabling
platform for Deep Learning.

My open question to you are:

   - Do you use Apache Spark in you DL pipelines?
   - How do you use Spark for DL? Is it just a stand-alone stage in the
   workflow (ie data preparation script) or is it  more integrated

I see a major advantage in leveraging on Spark as a unified entrypoint, for
example you can easily abstract data sources and leverage on existing team
skills for data pre-processing and training. On the flip side you may hit
some limitations including supported versions and so on.
What is your experience?

Thanks!


PySpark OOM when running PCA

2019-02-07 Thread Riccardo Ferrari
Hi list,

I am having troubles running a PCA with pyspark. I am trying to reduce a
matrix size since my features after OHE gets 40k wide.

Spark 2.2.0 Stand-alone (Oracle JVM)
pyspark 2.2.0 from a docker (OpenJDK)

I'm starting the spark session from the notebook however I make sure to:

   - PYSPARK_SUBMIT_ARGS: "--packages ... --driver-memory 20G pyspark-shell"
   - sparkConf.set("spark.executor.memory", "24G")
   - sparkConf.set("spark.driver.memory", "20G")

My executors gets 24Gb per node, and my driver process starts with:
/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp
/usr/local/spark/conf/:/usr/local/spark/jars/* -Xmx20G
org.apache.spark.deploy.SparkSubmit --conf spark.executor.memory=24G ...
pyspark-shell

So I should have plenty of memory to play with, however when running
PCA.fit I get in the spark driver logs:
19/02/08 01:02:43 WARN TaskSetManager: Stage 29 contains a task of very
large size (142 KB). The maximum recommended task size is 100 KB.
19/02/08 01:02:43 WARN RowMatrix: 34771 columns will require at least 9672
megabytes of memory!
19/02/08 01:02:46 WARN RowMatrix: 34771 columns will require at least 9672
megabytes of memory!

Eventually fails:
Py4JJavaError: An error occurred while calling o287.fit.
: java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
...
at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:122)
at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:344)
at
org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:387)
at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:48)
...

What am I missing ?
Any hints much appreciated,


Re: I have trained a ML model, now what?

2019-01-23 Thread Riccardo Ferrari
Felix, thank you very much for the link. Much appreciated.

The attached PDF is very interesting, I found myself evaluating many of the
scenarios described in Q3. It's unfortunate the proposal is not being
worked on, would be great to see that part of the code base.

It is cool to see big players like Uber trying to make Open Source better,
thanks!


On Tue, Jan 22, 2019 at 5:24 PM Felix Cheung 
wrote:

> About deployment/serving
>
> SPIP
> https://issues.apache.org/jira/browse/SPARK-26247
>
>
> --
> *From:* Riccardo Ferrari 
> *Sent:* Tuesday, January 22, 2019 8:07 AM
> *To:* User
> *Subject:* I have trained a ML model, now what?
>
> Hi list!
>
> I am writing here to here about your experience on putting Spark ML models
> into production at scale.
>
> I know it is a very broad topic with many different faces depending on the
> use-case, requirements, user base and whatever is involved in the task.
> Still I'd like to open a thread about this topic that is as important as
> properly training a model and I feel is often neglected.
>
> The task is *serving web users with predictions* and the main challenge I
> see is making it agile and swift.
>
> I think there are mainly 3 general categories of such deployment that can
> be described as:
>
>- Offline/Batch: Load a model, performs the inference, store the
>results in some datasotre (DB, indexes,...)
>- Spark in the loop: Having a long running Spark context exposed in
>some way, this include streaming as well as some custom application that
>wraps the context.
>- Use a different technology to load the Spark MLlib model and run the
>inference pipeline. I have read about MLeap and other PMML based solutions.
>
> I would love to hear about opensource solutions and possibly without
> requiring cloud provider specific framework/component.
>
> Again I am aware each of the previous category have benefits and drawback,
> so what would you pick? Why? and how?
>
> Thanks!
>


Re: How to query on Cassandra and load results in Spark dataframe

2019-01-23 Thread Riccardo Ferrari
Hi Soheil,

You should able to apply some filter transformation. Spark is lazy
evaluated and the actual loading from Cassandra happens only when an action
triggers it. Find more here:
https://spark.apache.org/docs/2.3.2/rdd-programming-guide.html#rdd-operations

The Spark Cassandra supports filters push down:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#datasource-specific-options

You can always check everything goes as expected by asking Spark to
`explain` your Dataframe:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@explain():Unit

HTH,

On Wed, Jan 23, 2019 at 8:44 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> Using the command
>
> val table = spark
>   .read
>   .format("org.apache.spark.sql.cassandra")
>   .options(Map( "table" -> "A", "keyspace" -> "B"))
>   .load
>
> someone can load whole table data into a dataframe. Instead, I want to run
> a query in Cassandra and load just the result in dataframe (not whole
> table).
>
> Is it possible in spark?
>


I have trained a ML model, now what?

2019-01-22 Thread Riccardo Ferrari
Hi list!

I am writing here to here about your experience on putting Spark ML models
into production at scale.

I know it is a very broad topic with many different faces depending on the
use-case, requirements, user base and whatever is involved in the task.
Still I'd like to open a thread about this topic that is as important as
properly training a model and I feel is often neglected.

The task is *serving web users with predictions* and the main challenge I
see is making it agile and swift.

I think there are mainly 3 general categories of such deployment that can
be described as:

   - Offline/Batch: Load a model, performs the inference, store the results
   in some datasotre (DB, indexes,...)
   - Spark in the loop: Having a long running Spark context exposed in some
   way, this include streaming as well as some custom application that wraps
   the context.
   - Use a different technology to load the Spark MLlib model and run the
   inference pipeline. I have read about MLeap and other PMML based solutions.

I would love to hear about opensource solutions and possibly without
requiring cloud provider specific framework/component.

Again I am aware each of the previous category have benefits and drawback,
so what would you pick? Why? and how?

Thanks!


Re: Connection issue with AWS S3 from PySpark 2.3.1

2018-12-21 Thread Riccardo Ferrari
Hi Aakash,

Can you share how are you adding those jars? Are you using the package
method ? I assume you're running in a cluster, and those dependencies might
have not properly distributed.

How are you submitting your app? What kind of resource manager are you
using standalone, yarn, ...

Best,

On Fri, Dec 21, 2018 at 1:18 PM Aakash Basu 
wrote:

> Any help, anyone?
>
> On Fri, Dec 21, 2018 at 2:21 PM Aakash Basu 
> wrote:
>
>> Hey Shuporno,
>>
>> With the updated config too, I am getting the same error. While trying to
>> figure that out, I found this link which says I need aws-java-sdk (which I
>> already have):
>> https://github.com/amazon-archives/kinesis-storm-spout/issues/8
>>
>> Now, this is my java details:
>>
>> java version "1.8.0_181"
>>
>> Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
>>
>> Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
>>
>>
>>
>> Is it due to some java version mismatch then or is it something else I am
>> missing out? What do you think?
>>
>> Thanks,
>> Aakash.
>>
>> On Fri, Dec 21, 2018 at 1:43 PM Shuporno Choudhury <
>> shuporno.choudh...@gmail.com> wrote:
>>
>>> Hi,
>>> I don't know whether the following config (that you have tried) are
>>> correct:
>>> fs.s3a.awsAccessKeyId
>>> fs.s3a.awsSecretAccessKey
>>>
>>> The correct ones probably are:
>>> fs.s3a.access.key
>>> fs.s3a.secret.key
>>>
>>> On Fri, 21 Dec 2018 at 13:21, Aakash Basu-2 [via Apache Spark User List]
>>>  wrote:
>>>
 Hey Shuporno,

 Thanks for a prompt reply. Thanks for noticing the silly mistake, I
 tried this out, but still getting another error, which is related to
 connectivity it seems.

 >>> hadoop_conf.set("fs.s3a.awsAccessKeyId", "abcd")
> >>> hadoop_conf.set("fs.s3a.awsSecretAccessKey", "123abc")
> >>> a =
> spark.read.csv("s3a:///test-bucket/breast-cancer-wisconsin.csv",
> header=True)
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/readwriter.py",
> line 441, in csv
> return
> self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py",
> line 63, in deco
> return f(*a, **kw)
>   File
> "/Users/aakash/Downloads/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o220.csv.
> : java.lang.NoClassDefFoundError:
> com/amazonaws/auth/AWSCredentialsProvider
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
> at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:354)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
> at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:596)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 

Re: Pyspark Partitioning

2018-09-30 Thread Riccardo Ferrari
Hi Dimitris,

I believe the methods partitionBy

and mapPartitions

are specific to RDDs while you're talking about DataFrames
.
I guess you have few options including:
1. use the Dataframe.rdd

call and process the returned RDD. Please note the return type for this
call is and RDD of Row
2. User the groupBy

from Dataframes and start from there, this may involved defining an udf or
leverage on the existing GroupedData

functions.

It really depends on your use-case and your performance requirements.
HTH

On Sun, Sep 30, 2018 at 8:31 PM dimitris plakas 
wrote:

> Hello everyone,
>
> I am trying to split a dataframe on partitions and i want to apply a
> custom function on every partition. More precisely i have a dataframe like
> the one below
>
> Group_Id | Id | Points
> 1| id1| Point1
> 2| id2| Point2
>
> I want to have a partition for every Group_Id and apply on every partition
> a function defined by me.
> I have tried with partitionBy('Group_Id').mapPartitions() but i receive
> error.
> Could you please advice me how to do it?
>


Re: jar file problem

2017-10-19 Thread Riccardo Ferrari
This is a good place to start from:
https://spark.apache.org/docs/latest/submitting-applications.html


Best,

On Thu, Oct 19, 2017 at 5:24 PM, Uğur Sopaoğlu  wrote:

> Hello,
>
> I have a very easy problem. How I run a spark job, I must copy jar file to
> all worker nodes. Is there any way to do simple?.
>
> --
> Uğur Sopaoğlu
>


Re: [Timer-0:WARN] Logging$class: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2017-09-18 Thread Riccardo Ferrari
Hi Jean,

What does the master UI say? http://10.0.100.81:8080
Do you have enough resources availalbe, or is there any running context
that is depleting all your resources ?
Are your workers registered and alive ? How much memory each? How many
cores each ?

Best


On Mon, Sep 18, 2017 at 11:24 PM, Jean Georges Perrin  wrote:

> Hi,
>
> I am trying to connect to a new cluster I just set up.
>
> And I get...
> [Timer-0:WARN] Logging$class: Initial job has not accepted any resources;
> check your cluster UI to ensure that workers are registered and have
> sufficient resources
>
> I must have forgotten something really super obvious.
>
> My connection code is:
> SparkSession spark = SparkSession.builder()
> .appName("JavaSparkPi")
> .master("spark://10.0.100.81:7077")
> .config("spark.executor.memory", "4g")
> .config("spark.executor.cores", "2")
> .getOrCreate();
>
> My master is on 10.0.100.81, start with a simple start-master.sh. I have 2
> nodes, which i run with ./sbin/start-slave.sh spark://un.oplo.io:7077.
>
> I use Spark's own resource scheduler.
>
> jg
>
>
>
>
>


Re: How to convert Row to JSON in Java?

2017-09-11 Thread Riccardo Ferrari
I agree,

Java tend to be pretty verbose unfortunately. You can check the "alternate"
approach that should be more compact and readable.
Should be something like:
df.select(to_json(struct(col("*")).alias("value"))
Of course to_json, struct and col are from the
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html



On Mon, Sep 11, 2017 at 10:15 PM, kant kodali <kanth...@gmail.com> wrote:

> getValuesMap is not very Java friendly. I have to do something like this
>
> String[] fieldNames = row.schema().fieldNames();
> Seq fieldNamesSeq = 
> JavaConverters.asScalaIteratorConverter(Arrays.asList(fieldNames).iterator()).asScala().toSeq();
> String json = row.getValuesMap(fieldNamesSeq).toString();
>
>
> On Mon, Sep 11, 2017 at 12:39 AM, Riccardo Ferrari <ferra...@gmail.com>
> wrote:
>
>> Hi Ayan, yup that works very well however I believe Kant's other mail
>> "Queries with streaming sources must be executed with writeStream.start()"
>> is adding more context.
>> I think he is trying leverage on structured streaming and applying the
>> rdd conversion to a streaming dataset is breaking the streaming lineage.
>> Best,
>>
>> On Mon, Sep 11, 2017 at 3:51 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Sorry for side-line question, but for Python, isn't following the
>>> easiest:
>>>
>>> >>> import json
>>> >>>* df1 = df.rdd.map(lambda r: json.dumps(r.asDict()))*
>>> >>> df1.take(10)
>>> ['{"id": 1}', '{"id": 2}', '{"id": 3}', '{"id": 4}', '{"id": 5}']
>>>
>>>
>>>
>>>
>>> On Mon, Sep 11, 2017 at 4:22 AM, Riccardo Ferrari <ferra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kant,
>>>>
>>>> You can check the getValuesMap
>>>> <https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/Row.html#getValuesMap-scala.collection.Seq->.
>>>> I found this post <https://stackoverflow.com/a/41602178/8356352>
>>>> useful, it is in Scala but should be a good starting point.
>>>> An alternative approach is combine the 'struct' and 'to_json' functions
>>>> <https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html>.
>>>> I have not tested this in Java but I am using it in Python.
>>>>
>>>> Best,
>>>>
>>>> On Sun, Sep 10, 2017 at 1:45 AM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> toJSON on Row object.
>>>>>
>>>>> On Sat, Sep 9, 2017 at 4:18 PM, Felix Cheung <
>>>>> felixcheun...@hotmail.com> wrote:
>>>>>
>>>>>> toJSON on Dataset/DataFrame?
>>>>>>
>>>>>> --
>>>>>> *From:* kant kodali <kanth...@gmail.com>
>>>>>> *Sent:* Saturday, September 9, 2017 4:15:49 PM
>>>>>> *To:* user @spark
>>>>>> *Subject:* How to convert Row to JSON in Java?
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> How to convert Row to JSON in Java? It would be nice to have
>>>>>> .toJson() method in the Row class.
>>>>>>
>>>>>> Thanks,
>>>>>> kant
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>


Re: How to convert Row to JSON in Java?

2017-09-11 Thread Riccardo Ferrari
Hi Ayan, yup that works very well however I believe Kant's other mail
"Queries with streaming sources must be executed with writeStream.start()"
is adding more context.
I think he is trying leverage on structured streaming and applying the rdd
conversion to a streaming dataset is breaking the streaming lineage.
Best,

On Mon, Sep 11, 2017 at 3:51 AM, ayan guha <guha.a...@gmail.com> wrote:

> Sorry for side-line question, but for Python, isn't following the easiest:
>
> >>> import json
> >>>* df1 = df.rdd.map(lambda r: json.dumps(r.asDict()))*
> >>> df1.take(10)
> ['{"id": 1}', '{"id": 2}', '{"id": 3}', '{"id": 4}', '{"id": 5}']
>
>
>
>
> On Mon, Sep 11, 2017 at 4:22 AM, Riccardo Ferrari <ferra...@gmail.com>
> wrote:
>
>> Hi Kant,
>>
>> You can check the getValuesMap
>> <https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/Row.html#getValuesMap-scala.collection.Seq->.
>> I found this post <https://stackoverflow.com/a/41602178/8356352> useful,
>> it is in Scala but should be a good starting point.
>> An alternative approach is combine the 'struct' and 'to_json' functions
>> <https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html>.
>> I have not tested this in Java but I am using it in Python.
>>
>> Best,
>>
>> On Sun, Sep 10, 2017 at 1:45 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> toJSON on Row object.
>>>
>>> On Sat, Sep 9, 2017 at 4:18 PM, Felix Cheung <felixcheun...@hotmail.com>
>>> wrote:
>>>
>>>> toJSON on Dataset/DataFrame?
>>>>
>>>> --
>>>> *From:* kant kodali <kanth...@gmail.com>
>>>> *Sent:* Saturday, September 9, 2017 4:15:49 PM
>>>> *To:* user @spark
>>>> *Subject:* How to convert Row to JSON in Java?
>>>>
>>>> Hi All,
>>>>
>>>> How to convert Row to JSON in Java? It would be nice to have .toJson()
>>>> method in the Row class.
>>>>
>>>> Thanks,
>>>> kant
>>>>
>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: How to convert Row to JSON in Java?

2017-09-10 Thread Riccardo Ferrari
Hi Kant,

You can check the getValuesMap
.
I found this post  useful, it
is in Scala but should be a good starting point.
An alternative approach is combine the 'struct' and 'to_json' functions
.
I have not tested this in Java but I am using it in Python.

Best,

On Sun, Sep 10, 2017 at 1:45 AM, kant kodali  wrote:

> toJSON on Row object.
>
> On Sat, Sep 9, 2017 at 4:18 PM, Felix Cheung 
> wrote:
>
>> toJSON on Dataset/DataFrame?
>>
>> --
>> *From:* kant kodali 
>> *Sent:* Saturday, September 9, 2017 4:15:49 PM
>> *To:* user @spark
>> *Subject:* How to convert Row to JSON in Java?
>>
>> Hi All,
>>
>> How to convert Row to JSON in Java? It would be nice to have .toJson()
>> method in the Row class.
>>
>> Thanks,
>> kant
>>
>
>


Re: Problem with CSV line break data in PySpark 2.1.0

2017-09-03 Thread Riccardo Ferrari
Hi Aakash,

What I see in the picture seems correct. Spark (pyspark) is reading your F2
cell as a multi-line text. Where are the nulls you're referring to?
You might find the pyspark.sql.functions.regexp_replace

useful
to remove new lines and unwanted characters:
df.select(..., regexp_replace(, '\s+|\n', ' '), ...)

Best,

On Sun, Sep 3, 2017 at 12:15 PM, Aakash Basu 
wrote:

> Hi,
>
> I've a dataset where a few rows of the column F as shown below have line
> breaks in CSV file.
>
> [image: Inline image 1]
>
> When Spark is reading it, it is coming as below, which is a complete new
> line.
>
> [image: Inline image 2]
>
> I want my PySpark 2.1.0 to read it by forcefully avoiding the line break
> after the date, which is not happening as I am using com.databricks.csv
> reader. And nulls are getting created after the date for line 2 for the
> rest of the columns from G till end.
>
> Can I please be helped how to handle this?
>
> Thanks,
> Aakash.
>


Re: PySpark, Structured Streaming and Kafka

2017-08-23 Thread Riccardo Ferrari
Hi Brian,

Very nice work you have done!

WRT you issue: Can you clarify how are you adding the kafka dependency when
using Jupyter? The ClassNotFoundException really tells you about the
missing dependency.

A bit different is the IllegalArgumentException error, that is simply
because you are not telling spark what you want to submit:
https://issues.apache.org/jira/browse/SPARK-15360

Best,


On Wed, Aug 23, 2017 at 10:41 PM, Brian Wylie 
wrote:

> Hi All,
>
> I'm trying the new hotness of using Kafka and Structured Streaming.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
> 0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> My setup is a bit weird (yes.. yes.. I know...)
> - Eventually I'll just use a DataBricks cluster and life will be bliss :)
> - But for now I want to test/try stuff out on my little Mac Laptop
>
> The newest version of PySpark will install a local Spark server with a
> simple:
> $ pip install pyspark
>
> This is very nice. I've put together a little notebook using that kewl
> feature:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark_Cheesy.ipynb
>
> So the next step is the setup/use a Kafka message queue and that went
> well/works fine.
>
> $ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns
>
> *I get messages spitting out*
>
> {"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}
>
>
> Okay, finally getting to my question:
> - Local spark server (good)
> - Local kafka server and messages getting produced (good)
> - Trying to this line of PySpark code (not good)
>
> # Setup connection to Kafka Stream dns_events = 
> spark.readStream.format('kafka')\
>   .option('kafka.bootstrap.servers', 'localhost:9092')\
>   .option('subscribe', 'dns')\
>   .option('startingOffsets', 'latest')\
>   .load()
>
>
> fails with:
> java.lang.ClassNotFoundException: Failed to find data source: kafka.
> Please find packages at http://spark.apache.org/third-party-projects.html
>
> I've looked that the URL listed... and poking around I can see that maybe
> I need the kafka jar file as part of my local server.
>
> I lamely tried this:
> $ spark-submit --packages org.apache.spark:spark-sql-
> kafka-0-10_2.11:2.2.0
>
> Exception in thread "main" java.lang.IllegalArgumentException: Missing
> application resource. at org.apache.spark.launcher.CommandBuilderUtils.
> checkArgument(CommandBuilderUtils.java:241) at org.apache.spark.launcher.
> SparkSubmitCommandBuilder.buildSparkSubmitArgs(SparkSubmitCommandBuilder.java:160)
> at org.apache.spark.launcher.SparkSubmitCommandBuilder.
> buildSparkSubmitCommand(SparkSubmitCommandBuilder.java:274) at
> org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(
> SparkSubmitCommandBuilder.java:151) at org.apache.spark.launcher.
> Main.main(Main.java:86)
>
>
> Anyway, all my code/versions/etc are in this notebook:
> - https://github.com/Kitware/BroThon/blob/master/notebooks/
> Bro_to_Spark.ipynb
>
> I'd be tremendously appreciative of some super nice, smart person if they
> could point me in the right direction :)
>
> -Brian Wylie
>


Re: Is there an operation to create multi record for every element in a RDD?

2017-08-09 Thread Riccardo Ferrari
Depends on your Spark version, have you considered the Dataset api?

You can do something like:

val df1 = rdd1.toDF("userid")

val listRDD = sc.parallelize(listForRule77)

val listDF = listRDD.toDF("data")

df1.crossJoin(listDF).orderBy("userid").show(60, truncate=false)

+--+--+

|userid|data  |

+--+--+

|1 |1,1,100.00|1483891200,|

|1 |1,1,100.00|1483804800,|

...

|1 |1,1,100.00|1488902400,|

|1 |1,1,100.00|1489075200,|

|1 |1,1,100.00|1488470400,|

...

On Wed, Aug 9, 2017 at 10:44 AM, Ryan  wrote:

> It's just sort of inner join operation... If the second dataset isn't very
> large it's ok(btw, you can use flatMap directly instead of map followed by
> flatmap/flattern), otherwise you can register the second one as a
> rdd/dataset, and join them on user id.
>
> On Wed, Aug 9, 2017 at 4:29 PM,  wrote:
>
>> hello guys:
>>   I have a simple rdd like :
>> val userIDs = 1 to 1
>> val rdd1 = sc.parallelize(userIDs , 16)   //this rdd has 1 user id
>>   And I have a List[String] like below:
>> scala> listForRule77
>> res76: List[String] = List(1,1,100.00|1483286400, 1,1,100.00|1483372800,
>> 1,1,100.00|1483459200, 1,1,100.00|1483545600, 1,1,100.00|1483632000,
>> 1,1,100.00|1483718400, 1,1,100.00|1483804800, 1,1,100.00|1483891200,
>> 1,1,100.00|1483977600, 3,1,200.00|1485878400, 1,1,100.00|1485964800,
>> 1,1,100.00|1486051200, 1,1,100.00|1488384000, 1,1,100.00|1488470400,
>> 1,1,100.00|1488556800, 1,1,100.00|1488643200, 1,1,100.00|1488729600,
>> 1,1,100.00|1488816000, 1,1,100.00|1488902400, 1,1,100.00|1488988800,
>> 1,1,100.00|1489075200, 1,1,100.00|1489161600, 1,1,100.00|1489248000,
>> 1,1,100.00|1489334400, 1,1,100.00|1489420800, 1,1,100.00|1489507200,
>> 1,1,100.00|1489593600, 1,1,100.00|148968, 1,1,100.00|1489766400)
>>
>> scala> listForRule77.length
>> res77: Int = 29
>>
>>   I need to create a rdd containing  29 records. for every userid
>> in rdd1 , I need to create 29 records according to listForRule77, each
>> record start with the userid, for example 1(the
>> userid),1,1,100.00|1483286400.
>>   My idea is like below:
>> 1.write a udf
>> to add the userid to the beginning of every string element
>> of listForRule77.
>> 2.use
>> val rdd2 = rdd1.map{x=> List_udf(x))}.flatmap()
>> , the result rdd2 maybe what I need.
>>
>>   My question: Are there any problems in my idea? Is there a better
>> way to do this ?
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>
>


Re: Spark Streaming job statistics

2017-08-08 Thread Riccardo Ferrari
Hi,

Have you tried to check the "Streaming" tab menu?

Best,

On Tue, Aug 8, 2017 at 4:15 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running spark streaming job which receives data from azure iot hub. I
> am not sure if the connection was successful and receving any data. does
> the input column show how much data it has read if the connection was
> successful?
>
> [image: Inline image 1]
>


PySpark Streaming keeps dying

2017-08-05 Thread Riccardo Ferrari
Hi list,

I have Sark 2.2.0 in standalone mode and python 3.6. It is a very small
testing cluster with two nodes.
I am running (trying) a streaming job that simple read from kafka, apply an
ML model and store it back into kafka.
The job is run with following parameters:
"--conf spark.cores.max=2 --conf spark.executor.cores=2 --conf
spark.executor.memory=2g"

The problem I'm facing is that very often the job crash with this exception:

117/08/05 00:19:00 ERROR Utils: Uncaught exception in thread stdout writer
for /opt/spark/miniconda2/envs/pyspark36/bin/python
java.lang.AssertionError: assertion failed: Block rdd_474_0 is not locked
for reading
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/08/05 00:19:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[stdout writer for
/opt/spark/miniconda2/envs/pyspark36/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_474_0 is not locked
for reading
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

The stream is created via:
directKafkaStream = KafkaUtils.createDirectStream(ssc,...

The processing:
directKafkaStream.cache().foreachRDD(self._process)

where self._process:

   - puts the RDD into a Dataframe
   - apply a model.transform
   - store it back

Has anyone experienced this?
Any suggestion on how to attak the problem?
I am not sure it is resource constraint as I tried rise cores and memory
with no luck.

Any hint much appreciated,


Re: PySpark Streaming S3 checkpointing

2017-08-03 Thread Riccardo Ferrari
Hi Steve,

Thank you for your answer, much appreciated.

Reading the code seems that:

   - Python StreamingContext.getOrCreate
   
<https://github.com/apache/spark/blob/master/python/pyspark/streaming/context.py#L119>calls
   Scala StreamingContextPythonHelper().tryRecoverFromCheckpoint(
   checkpointPath)
   
<https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala#L871>
   - tryRecoverFromCheckpoint calls CheckpointReader.read(..., new
   SparkConf(), SparkHadoopUtil.get.conf,...)
   - SparkHadoopUtil.get.conf
   
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L49>
   (when not using yarn) do:
  - sparkConf = new SparkConf(false).loadFromSystemProperties(true)
  - Configuration = newConfiguration(sparkConf)

I have to admit I have not tested (read: debug) it and might not be
completely accurate  (checkpointing is not the highest priority), however I
have the feeling I can not provide those properties via code because a new
configuration gets instantiated/read from system properties and whatever I
set to the current running context is ignored (or at least this happens in
python).

What do you (or any in the list) think?

Thanks,



On Wed, Aug 2, 2017 at 6:04 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 2 Aug 2017, at 10:34, Riccardo Ferrari <ferra...@gmail.com> wrote:
>
> Hi list!
>
> I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
> checkpointing. At high level my python script goes like this:
>
> class StreamingJob():
>
> def __init__(..):
> ...
>sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
>sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)
>
> def doJob(self):
>ssc = StreamingContext.getOrCreate('',  create ssc>)
>
> and I run it:
>
> myJob = StreamingJob(...)
> myJob.doJob()
>
> The problem is that StreamingContext.getOrCreate is not able to have
> access to hadoop configuration configured in the constructor and fails to
> load from checkpoint with
>
> "com.amazonaws.AmazonClientException: Unable to load AWS credentials from
> any provider in the chain"
>
> If I export AWS credentials to the system ENV before starting the script
> it works!
>
>
> Spark magically copies the env vars over for you when you launch a job
>
> I see the Scala version has an option to provide the hadoop configuration
> that is not available in python
>
> I don't have the whole Hadoop, just Spark, so I don't really want to
> configure hadoop's xmls and such
>
>
> when you set up the context, as in spark-defaults.conf
>
> spark.hadoop.fs.s3a.access.key=access key
> spark.hadoop.fs.s3a.secret.key=secret key
>
> Reminder: Do keep your secret key a secret, avoid checking it in to any
> form of revision control.
>


PySpark Streaming S3 checkpointing

2017-08-02 Thread Riccardo Ferrari
Hi list!

I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
checkpointing. At high level my python script goes like this:

class StreamingJob():

def __init__(..):
...
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)

def doJob(self):
   ssc = StreamingContext.getOrCreate('', )

and I run it:

myJob = StreamingJob(...)
myJob.doJob()

The problem is that StreamingContext.getOrCreate is not able to have access
to hadoop configuration configured in the constructor and fails to load
from checkpoint with

"com.amazonaws.AmazonClientException: Unable to load AWS credentials from
any provider in the chain"

If I export AWS credentials to the system ENV before starting the script it
works!

I see the Scala version has an option to provide the hadoop configuration
that is not available in python

I don't have the whole Hadoop, just Spark, so I don't really want to
configure hadoop's xmls and such

What is the cleanest way to achieve my goal?

 thanks!


Re: SPARK Issue in Standalone cluster

2017-07-31 Thread Riccardo Ferrari
Hi Gourav,

The issue here is the location where you're trying to write/read from :
/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available
to all executors (and driver), and that is reason why you generally use
HDFS, S3, NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not
tries to pick up the data from a selected node, it rather tries to
write/read in parallel from the executor nodes. Also given its control
logic there is no way (read. you should not care) to know what executor is
doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta 
wrote:

> Hi,
>
> I am working by creating a native SPARK standalone cluster (
> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>
> Therefore I  do not have a HDFS.
>
>
> EXERCISE:
> Its the most fundamental and simple exercise. Create a sample SPARK
> dataframe and then write it to a location and then read it back.
>
> SETTINGS:
> So after I have installed SPARK in two physical systems with the same:
> 1. SPARK version,
> 2. JAVA version,
> 3. PYTHON_PATH
> 4. SPARK_HOME
> 5. PYSPARK_PYTHON
> the user in both the systems is the root user therefore there are no
> permission issues anywhere.
>
> I am able to start:
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
> computers)
>
> After that I can see in the spark UI (at port 8080) two workers.
>
>
> CODE:
> Then I run the following code:
>
> ==
> import findspark
> import os
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
> findspark.init()
> import pyspark
> from pyspark.sql import SparkSession
> spark = (SparkSession.builder
> .master("spark://mastersystem.local:7077")
> .appName("gouravtest")
> .enableHiveSupport()
> .getOrCreate())
> import pandas, numpy
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(1,
> 4), columns=list('ABCD')))
> testdf.cache()
> testdf.count()
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test2").count()
> ==
>
>
> ERROR I (in above code):
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> This line does not fail or report any error. But when I am looking at the
> stage in spark Application UI the error reported for one of the slave node
> which is not in the same system as the master node is mentioned below. The
> writing on the slave node which is in the same physical system as the
> Master happens correctly. (NOTE: slave node basically the worker and master
> node the driver)
> 
> --
>
> 0 (TID 41). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_06_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_06
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_06_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_28_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_28
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_28_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_21_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_21
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_21_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 
> 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 
> 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 'attempt_20170731001928_0002_m_18_0' to 
> file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_18
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: 
> attempt_20170731001928_0002_m_18_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 
> 

Re: Logging in RDD mapToPair of Java Spark application

2017-07-29 Thread Riccardo Ferrari
Hi John,

The reason you don't see the second sysout line is because is executed on a
different JVM (ie. Driver vs Executor). the second sysout line should be
available through the executor logs. Check the Executors tab.

There are alternative approaches to manage log centralization however it
really depends on what are your requirements.

Hope it helps,

On Sat, Jul 29, 2017 at 8:09 PM, johnzengspark  wrote:

> Hi, All,
>
> Although there are lots of discussions related to logging in this news
> group, I did not find an answer to my specific question so I am posting
> mine
> with the hope that this will not cause a duplicated question.
>
> Here is my simplified Java testing Spark app:
>
> public class SparkJobEntry {
> public static void main(String[] args) {
> // Following line is in stdout from JobTracker UI
> System.out.println("argc=" + args.length);
>
> SparkConf conf = new SparkConf().setAppName("
> TestSparkApp");
> JavaSparkContext sc = new JavaSparkContext(conf);
> JavaRDD fileRDD = sc.textFile(args[0]);
>
> fileRDD.mapToPair(new PairFunction String>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Tuple2 call(String input)
> throws Exception {
> // Following line is not in stdout from
> JobTracker UI
> System.out.println("This line should be
> printed in stdout");
> // Other code removed from here to make
> things simple
> return new Tuple2("1",
> "Testing data");
> }}).saveAsTextFile(args[0] + ".results");
> }
> }
>
> What I expected from JobTracker UI is to see both stdout lines: first line
> is "argc=2" and second line is "This line should be printed in stdout".
> But
> I only see the first line which is outside of the 'mapToPair'.  I actually
> have verified my 'mapToPair' is called and the statements after the second
> logging line were executed.  The only issue for me is why the second
> logging
> is not in JobTracker UI.
>
> Appreciate your help.
>
> Thanks
>
> John
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Logging-in-RDD-mapToPair-of-Java-
> Spark-application-tp29007.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread Riccardo Ferrari
What's against:

df.rdd.map(...)

or

dataset.foreach()

https://spark.apache.org/docs/2.0.1/api/scala/index.html#org.apache.spark.sql.Dataset@foreach(f:T=
>Unit):Unit

Best,

On Tue, Jul 18, 2017 at 6:46 PM, lucas.g...@gmail.com 
wrote:

> I've been wondering about this for awhile.
>
> We wanted to do something similar for generically saving thousands of
> individual homogenous events into well formed parquet.
>
> Ultimately I couldn't find something I wanted to own and pushed back on
> the requirements.
>
> It seems the canonical answer is that you need to 'own' the schema of the
> json and parse it out manually and into your dataframe.  There's nothing
> challenging about it.  Just verbose code.  If you're 'info' is a consistent
> schema then you'll be fine.  For us it was 12 wildly diverging schemas and
> I didn't want to own the transforms.
>
> I also recommend persisting anything that isn't part of your schema in an
> 'extras field'  So when you parse out your json, if you've got anything
> leftover drop it in there for later analysis.
>
> I can provide some sample code but I think it's pretty straightforward /
> you can google it.
>
> What you can't seem to do efficiently is dynamically generate a dataframe
> from random JSON.
>
>
> On 18 July 2017 at 01:57, Chetan Khatri 
> wrote:
>
>> Implicit tried - didn't worked!
>>
>> from_json - didnt support spark 2.0.1 any alternate solution would be
>> welcome please
>>
>>
>> On Tue, Jul 18, 2017 at 12:18 PM, Georg Heiler > > wrote:
>>
>>> You need to have spark implicits in scope
>>> Richard Xin  schrieb am Di. 18. Juli
>>> 2017 um 08:45:
>>>
 I believe you could use JOLT (bazaarvoice/jolt
 ) to flatten it to a json string
 and then to dataframe or dataset.

 bazaarvoice/jolt

 jolt - JSON to JSON transformation library written in Java.
 




 On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:


 Explode is not working in this scenario with error - string cannot be
 used in explore either array or map in spark
 On Tue, Jul 18, 2017 at 11:39 AM, 刘虓  wrote:

 Hi,
 have you tried to use explode?

 Chetan Khatri  于2017年7月18日 周二下午2:06写道:

 Hello Spark Dev's,

 Can you please guide me, how to flatten JSON to multiple columns in
 Spark.

 *Example:*

 Sr No Title ISBN Info
 1 Calculus Theory 1234567890

 [{"cert":[{
 "authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa",
 009415da-c8cd-418d-869e- 0a19601d79fa
 "certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4",

 "effDt":"2016-05-06T15:04:56. 279Z",


 "fileFmt":"rjrCsv","status":" live"}],

 "expdCnt":"15",
 "mfgAcctNum":"531093",

 "oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c",


 "pgmRole":["RETAILER"],
 "pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63",
 "regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4",
 "rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}]

 I want to get single row with 11 columns.

 Thanks.


>>
>


Re: Spark UI crashes on Large Workloads

2017-07-18 Thread Riccardo Ferrari
The reason you get connection refused when connecting to the application UI
(port 4040) is because you app gets stopped thus the application UI stops
as well. To inspect your executors logs after the fact you might find
useful the Spark History server
<https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact>
(for standalone mode).

Personally I I collect the logs from my worker nodes. They generally sit
under the $SPARK_HOME/work// (for standalone).
There you can find exceptions and messages from the executors assigned to
your app.

Now, about you app crashing, might be useful check whether it is sized
correctly. The issue you linked sounds appropriate however I would give
some sanity checks a try. I solved many issues just by sizing an app that I
would first check memory size, cpu allocations and so on..

Best,

On Tue, Jul 18, 2017 at 3:30 PM, Saatvik Shah <saatvikshah1...@gmail.com>
wrote:

> Hi Riccardo,
>
> Yes, Thanks for suggesting I do that.
>
> [Stage 1:==>   (12750 + 40) /
> 15000]17/07/18 13:22:28 ERROR org.apache.spark.scheduler.LiveListenerBus:
> Dropping SparkListenerEvent because no remaining room in event queue. This
> likely means one of the SparkListeners is too slow and cannot keep up with
> the rate at which tasks are being started by the scheduler.
> 17/07/18 13:22:28 WARN org.apache.spark.scheduler.LiveListenerBus:
> Dropped 1 SparkListenerEvents since Thu Jan 01 00:00:00 UTC 1970
> [Stage 1:> (13320 + 41) /
> 15000]17/07/18 13:23:28 WARN org.apache.spark.scheduler.LiveListenerBus:
> Dropped 26782 SparkListenerEvents since Tue Jul 18 13:22:28 UTC 2017
> [Stage 1:==>   (13867 + 40) /
> 15000]17/07/18 13:24:28 WARN org.apache.spark.scheduler.LiveListenerBus:
> Dropped 58751 SparkListenerEvents since Tue Jul 18 13:23:28 UTC 2017
> [Stage 1:===>  (14277 + 40) /
> 15000]17/07/18 13:25:10 INFO org.spark_project.jetty.server.ServerConnector:
> Stopped ServerConnector@3b7284c4{HTTP/1.1}{0.0.0.0:4040}
> 17/07/18 13:25:10 ERROR org.apache.spark.scheduler.LiveListenerBus:
> SparkListenerBus has already stopped! Dropping event
> SparkListenerExecutorMetricsUpdate(4,WrappedArray())
> And similar WARN/INFO messages continue occurring.
>
> When I try to access the UI, I get:
>
> Problem accessing /proxy/application_1500380353993_0001/. Reason:
>
> Connection to http://10.142.0.17:4040 refused
>
> Caused by:
>
> org.apache.http.conn.HttpHostConnectException: Connection to 
> http://10.142.0.17:4040 refused
>   at 
> org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:190)
>   at 
> org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:294)
>   at 
> org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:643)
>   at 
> org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:479)
>   at 
> org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
>   at 
> org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
>   at 
> org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:784)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:200)
>   at 
> org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:387)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
>   at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
>
>
>
> I noticed this issue talks about something similar and I guess is related:
> https://issues.apache.org/jira/browse/SPARK-18838.
>
> On Tue, Jul 18, 2017 at 2:49 AM, Riccardo Ferrari <ferra...@gmail.com>
> wrote:
>
>> Hi,
>>  can you share more details. do you have any exceptions from the driver?
>> or executors?
>>
>> best,
>>
>> On Jul 18, 2017 02:49, "saatvikshah1994" <saatvikshah1...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a pyspark App which when provided a huge amount of data as input
>>> throws the error explained here sometimes:
>>> https://stackoverflow.com/questions/32340639/unable-to-under
>>> stand-error-sparklistenerbus-has-already-stopped-dropping-event.
>>> All my code is running inside the main function, and the only slightly
>>> peculiar thing I am doing in this app is using a custom PySpark ML
>>> Transformer(M

Re: Spark UI crashes on Large Workloads

2017-07-18 Thread Riccardo Ferrari
Hi,
 can you share more details. do you have any exceptions from the driver? or
executors?

best,

On Jul 18, 2017 02:49, "saatvikshah1994"  wrote:

> Hi,
>
> I have a pyspark App which when provided a huge amount of data as input
> throws the error explained here sometimes:
> https://stackoverflow.com/questions/32340639/unable-to-understand-error-
> sparklistenerbus-has-already-stopped-dropping-event.
> All my code is running inside the main function, and the only slightly
> peculiar thing I am doing in this app is using a custom PySpark ML
> Transformer(Modified from
> https://stackoverflow.com/questions/32331848/create-a-
> custom-transformer-in-pyspark-ml).
> Could this be the issue? How can I debug why this is happening?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-UI-crashes-on-Large-Workloads-tp28873.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Testing another Dataset after ML training

2017-07-12 Thread Riccardo Ferrari
Hi Michael,

I think I found you posting on SO:
https://stackoverflow.com/questions/45041677/java-spark-training-on-new-data-with-datasetrow-from-csv-file

The exception trace there is quite different from what I read here, and
indeed is self-explanatory:
...
Caused by: java.lang.IllegalArgumentException: requirement failed: The
columns of A don't match the number of elements of x. A: 38611, x: 36179
...
Can it be that you have different 'features' vector sizes from train and
test?

Best,

On Wed, Jul 12, 2017 at 1:41 PM, Kunkel, Michael C. <m.kun...@fz-juelich.de>
wrote:

> Greetings
>
> The attachment I meant to refer to was the posting in the initial email on
> the email list.
>
> BR
> MK
> 
> Michael C. Kunkel, USMC, PhD
> Forschungszentrum Jülich
> Nuclear Physics Institute and Juelich Center for Hadron Physics
> Experimental Hadron Structure (IKP-1)
> www.fz-juelich.de/ikp
>
> On Jul 12, 2017, at 09:56, Riccardo Ferrari <ferra...@gmail.com> wrote:
>
> Hi Michael,
>
> I don't see any attachment, not sure you can attach files though
>
> On Tue, Jul 11, 2017 at 10:44 PM, Michael C. Kunkel <
> m.kun...@fz-juelich.de> wrote:
>
>> Greetings,
>>
>> Thanks for the communication.
>>
>> I attached the entire stacktrace in which was output to the screen.
>> I tried to use JavaRDD and LabeledPoint then convert to Dataset and I
>> still get the same error as I did when I only used datasets.
>>
>> I am using the expected ml Vector. I tried it using the mllib and that
>> also didnt work.
>>
>> BR
>> MK
>> 
>> Michael C. Kunkel, USMC, PhD
>> Forschungszentrum Jülich
>> Nuclear Physics Institute and Juelich Center for Hadron Physics
>> Experimental Hadron Structure (IKP-1)www.fz-juelich.de/ikp
>>
>> On 11/07/2017 17:21, Riccardo Ferrari wrote:
>>
>> Mh, to me feels like there some data mismatch. Are you sure you're using
>> the expected Vector (ml vs mllib). I am not sure you attached the whole
>> Exception but you might find some more useful details there.
>>
>> Best,
>>
>> On Tue, Jul 11, 2017 at 3:07 PM, mckunkel <m.kun...@fz-juelich.de> wrote:
>>
>>> Im not sure why I cannot subscribe, so that everyone can view the
>>> conversation.
>>> Help?
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Testing-another-Dataset-after-ML-train
>>> ing-tp28845p28846.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>>
>> 
>> 
>> 
>> 
>> Forschungszentrum Juelich GmbH
>> 52425 Juelich
>> Sitz der Gesellschaft: Juelich
>> Eingetragen im Handelsregister des Amtsgerichts Dueren Nr. HR B 3498
>> Vorsitzender des Aufsichtsrats: MinDir Dr. Karl Eugen Huthmacher
>> Geschaeftsfuehrung: Prof. Dr.-Ing. Wolfgang Marquardt (Vorsitzender),
>> Karsten Beneke (stellv. Vorsitzender), Prof. Dr.-Ing. Harald Bolt,
>> Prof. Dr. Sebastian M. Schmidt
>> 
>> 
>> 
>> 
>>
>>
>


Re: Testing another Dataset after ML training

2017-07-12 Thread Riccardo Ferrari
Hi Michael,

I don't see any attachment, not sure you can attach files though

On Tue, Jul 11, 2017 at 10:44 PM, Michael C. Kunkel <m.kun...@fz-juelich.de>
wrote:

> Greetings,
>
> Thanks for the communication.
>
> I attached the entire stacktrace in which was output to the screen.
> I tried to use JavaRDD and LabeledPoint then convert to Dataset and I
> still get the same error as I did when I only used datasets.
>
> I am using the expected ml Vector. I tried it using the mllib and that
> also didnt work.
>
> BR
> MK
> 
> Michael C. Kunkel, USMC, PhD
> Forschungszentrum Jülich
> Nuclear Physics Institute and Juelich Center for Hadron Physics
> Experimental Hadron Structure (IKP-1)www.fz-juelich.de/ikp
>
> On 11/07/2017 17:21, Riccardo Ferrari wrote:
>
> Mh, to me feels like there some data mismatch. Are you sure you're using
> the expected Vector (ml vs mllib). I am not sure you attached the whole
> Exception but you might find some more useful details there.
>
> Best,
>
> On Tue, Jul 11, 2017 at 3:07 PM, mckunkel <m.kun...@fz-juelich.de> wrote:
>
>> Im not sure why I cannot subscribe, so that everyone can view the
>> conversation.
>> Help?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Testing-another-Dataset-after-ML-train
>> ing-tp28845p28846.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
>
> 
> 
> 
> 
> Forschungszentrum Juelich GmbH
> 52425 Juelich
> Sitz der Gesellschaft: Juelich
> Eingetragen im Handelsregister des Amtsgerichts Dueren Nr. HR B 3498
> Vorsitzender des Aufsichtsrats: MinDir Dr. Karl Eugen Huthmacher
> Geschaeftsfuehrung: Prof. Dr.-Ing. Wolfgang Marquardt (Vorsitzender),
> Karsten Beneke (stellv. Vorsitzender), Prof. Dr.-Ing. Harald Bolt,
> Prof. Dr. Sebastian M. Schmidt
> 
> 
> 
> 
>
>


Re: Testing another Dataset after ML training

2017-07-11 Thread Riccardo Ferrari
Mh, to me feels like there some data mismatch. Are you sure you're using
the expected Vector (ml vs mllib). I am not sure you attached the whole
Exception but you might find some more useful details there.

Best,

On Tue, Jul 11, 2017 at 3:07 PM, mckunkel  wrote:

> Im not sure why I cannot subscribe, so that everyone can view the
> conversation.
> Help?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Testing-another-Dataset-after-ML-
> training-tp28845p28846.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Testing another Dataset after ML training

2017-07-11 Thread Riccardo Ferrari
Hi,

Are you sure you're feeding the correct data format? I found this
conversation that might be useful:
http://apache-spark-user-list.1001560.n3.nabble.com/Description-of-data-file-sample-libsvm-data-txt-td25832.html

Best,

On Tue, Jul 11, 2017 at 1:42 PM, mckunkel  wrote:

> Greetings,
>
> Following the example on the AS page for Naive Bayes using Dataset
> https://spark.apache.org/docs/latest/ml-classification-
> regression.html#naive-bayes
>  regression.html#naive-bayes>
>
> I want to predict the outcome of another set of data. So instead of
> splitting the data into training and testing, I have 1 set of training and
> one set of testing. i.e.;
> Dataset training = spark.createDataFrame(
> dataTraining,
> schemaForFrame);
> Dataset testing = spark.createDataFrame(dataTesting,
> schemaForFrame);
>
> NaiveBayes nb = new NaiveBayes();
> NaiveBayesModel model = nb.fit(train);
> Dataset predictions = model.transform(testing);
> predictions.show();
>
> But I get the error.
>
> 17/07/11 13:40:38 INFO DAGScheduler: Job 2 finished: collect at
> NaiveBayes.scala:171, took 3.942413 s
> Exception in thread "main" org.apache.spark.SparkException: Failed to
> execute user defined function($anonfun$1: (vector) => vector)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(
> ScalaUDF.scala:1075)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.eval(
> namedExpressions.scala:144)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(
> Projection.scala:48)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(
> Projection.scala:30)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
> ...
> ...
> ...
>
>
> How do I perform predictions on other datasets that were not created at a
> split?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Testing-another-Dataset-after-ML-
> training-tp28845.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


PySpark saving custom pipelines

2017-07-09 Thread Riccardo Ferrari
Hi list,

I have developed some custom Transformer/Estimators in python around some
libraries (scipy), now I would love to persist them for reuse in a
streaming app.

I am currently aware of this:
https://issues.apache.org/jira/browse/SPARK-17025

However I would like to hear from experienced users how do they persist
their pipelines.

Thanks in advance


Re: Problem in avg function Spark 1.6.3 using spark-shell

2017-06-25 Thread Riccardo Ferrari
Hi,

Looks like you performed an aggregation on the ImageWidth column already.
The error itself is quite self-explanatory:

Cannot resolve column name "ImageWidth" among (MainDomainCode,
*avg(length(ImageWidth))*)

The column available in that DF are MainDomainCode and
avg(length(ImageWidth)) so you should use the alias and rename the column
back:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column

best,

On Sun, Jun 25, 2017 at 1:19 PM, Eko Susilo 
wrote:

> Hi,
>
> I have a data frame collection called “secondDf” when I tried to perform
> groupBy and then sum of each column it works perfectly. However when I
> tried to calculate average of that column it says the column name is not
> found. The details are as follow
>
> val total = secondDf.filter("ImageWidth > 1 and ImageHeight > 1").
>groupBy("MainDomainCode").
>agg(sum("ImageWidth"),
>sum("ImageHeight"),
>sum("ImageArea”))
>
>
> total.show  will show result as expected, However when I tried to
> calculate avg, the result is script error. Any help to resolve this issue?
>
> Regards,
> Eko
>
>
>   val average = secondDf.filter("ImageWidth > 1 and ImageHeight > 1").
>groupBy("MainDomainCode").
>agg(avg("ImageWidth"),
>avg("ImageHeight"),
>avg("ImageArea"))
>
>
> org.apache.spark.sql.AnalysisException: Cannot resolve column name
> "ImageWidth" among (MainDomainCode, avg(length(ImageWidth)));
> at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.
> apply(DataFrame.scala:152)
> at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.
> apply(DataFrame.scala:152)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
> at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
> at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$
> $iwC.(:42)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.
> (:49)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<
> init>(:51)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:63)
> at $iwC$$iwC$$iwC$$iwC.(:65)
> at $iwC$$iwC$$iwC.(:67)
> at $iwC$$iwC.(:69)
> at $iwC.(:71)
> at (:73)
> at .(:77)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(
> SparkIMain.scala:1065)
> at org.apache.spark.repl.SparkIMain$Request.loadAndRun(
> SparkIMain.scala:1346)
> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(
> SparkILoop.scala:857)
> at org.apache.spark.repl.SparkILoop.interpretStartingWith(
> SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(
> SparkILoop.scala:875)
> at org.apache.spark.repl.SparkILoop.interpretStartingWith(
> SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(
> SparkILoop.scala:875)
> at org.apache.spark.repl.SparkILoop.interpretStartingWith(
> SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(
> SparkILoop.scala:875)
> at org.apache.spark.repl.SparkILoop.interpretStartingWith(
> SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(
> SparkILoop.scala:875)
> at org.apache.spark.repl.SparkILoop.interpretStartingWith(
> SparkILoop.scala:902)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
> at org.apache.spark.repl.SparkILoop.org$apache$spark$
> repl$SparkILoop$$loop(SparkILoop.scala:670)
> at org.apache.spark.repl.SparkILoop$$anonfun$org$
> apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
> at org.apache.spark.repl.SparkILoop$$anonfun$org$
> apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
> at org.apache.spark.repl.SparkILoop$$anonfun$org$
> 

Re: Cassandra querying time stamps

2017-06-20 Thread Riccardo Ferrari
Hi,

Personally I would inspect how dates are managed. How does your spark code
looks like? What does the explain say. Does TimeStamp gets parsed the same
way?

Best,

On Tue, Jun 20, 2017 at 12:52 PM, sujeet jog  wrote:

> Hello,
>
> I have a table as below
>
> CREATE TABLE analytics_db.ml_forecast_tbl (
>"MetricID" int,
>"TimeStamp" timestamp,
>"ResourceID" timeuuid
>"Value"   double,
> PRIMARY KEY ("MetricID", "TimeStamp", "ResourceID")
> )
>
> select * from ml_forecast_tbl where "MetricID" = 1 and "TimeStamp" >
> '2016-01-22 00:00:00+' and "TimeStamp" <= '2016-01-22 00:30:00+' ;
>
>  MetricID | TimeStamp   | ResourceID
> | Value|
> --+-+---
> ---+--+
> |1 | 2016-01-22 00:30:00.00+ | 
> 4a925190-3b13-11e7-83c6-a32261219d03
> | 32.16177 |
> | 23.74124 | 15.2371
> 1 | 2016-01-22 00:30:00.00+ | 
> 4a92c6c0-3b13-11e7-83c6-a32261219d03
> | 32.16177 |
> | 23.74124 | 15.2371
> 1 | 2016-01-22 00:30:00.00+ | 
> 4a936300-3b13-11e7-83c6-a32261219d03
> | 32.16177 |
> | 23.74124 | 15.2371
> 1 | 2016-01-22 00:30:00.00+ | 
> 4a93d830-3b13-11e7-83c6-a32261219d03
> | 32.16177 |
> | 23.74124 | 15.2371
>
> This query runs perfectly fine from cqlsh,   but not with Spark SQL, it
> just emits empty results,
> Is there a catch to think about on querying timestamp ranges with
> cassandra spark connector
>
> Any inputs on this ?..
>
>
> Thanks,
> Sujeet
>


Re: Create dataset from dataframe with missing columns

2017-06-15 Thread Riccardo Ferrari
Hi Jason,

Is there a reason why you are not adding the desired column before mapping
it to a Dataset[CC]?
You could just do something like:
df = df.withColumn('f2', )
then do the:
df.as(CC)

Of course your default value can be null: lit(None).cast(to-some-type)

best,

On Thu, Jun 15, 2017 at 1:26 AM, Tokayer, Jason M. <
jason.toka...@capitalone.com> wrote:

> Is it possible to concisely create a dataset from a dataframe with missing
> columns? Specifically, suppose I create a dataframe with:
>
> val df: DataFrame  = Seq(("v1"),("v2")).toDF("f1")
>
>
>
> Then, I have a case class for a dataset defined as:
>
> case class CC(f1: String, f2: Option[String] = None)
>
>
>
> I’d like to use df.as[CC] to get an instance of the case class, but this
> gives me the following error:
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input
> columns: [f1];
>
>
>
> Is there a concise way to use the default values as defined by the case
> class?
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: UDF percentile_approx

2017-06-14 Thread Riccardo Ferrari
Hi Andres,

I can't find the refrence, last time I searched for that I found that
'percentile_approx' is only available via hive context. You should register
a temp table and use it from there.

Best,

On Tue, Jun 13, 2017 at 8:52 PM, Andrés Ivaldi  wrote:

> Hello, I`m trying to user percentile_approx  on my SQL query, but It's
> like spark context can´t find the function
>
> I'm using it like this
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.DataFrameStatFunctions
>
> val e = expr("percentile_approx(Cantidadcon0234514)")
> df.agg(e).show()
>
> and exception is
>
> org.apache.spark.sql.AnalysisException: Undefined function:
> 'percentile_approx'. This function is neither a registered temporary
> function nor a permanent function registered
>
> I've also tryid with callUDF
>
> Regards.
>
> --
> Ing. Ivaldi Andres
>


Re: Read Data From NFS

2017-06-13 Thread Riccardo Ferrari
Hi Ayan,
You might be interested in the official Spark docs:
https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism and
its spark.default.parallelism setting

Best,

On Mon, Jun 12, 2017 at 6:18 AM, ayan guha  wrote:

> I understand how it works with hdfs. My question is when hdfs is not the
> file sustem, how number of partitions are calculated. Hope that makes it
> clearer.
>
> On Mon, 12 Jun 2017 at 2:42 am, vaquar khan  wrote:
>
>>
>>
>> As per spark doc :
>> The textFile method also takes an optional second argument for
>> controlling the number of partitions of the file.* By default, Spark
>> creates one partition for each block of the file (blocks being 128MB by
>> default in HDFS)*, but you can also ask for a higher number of
>> partitions by passing a larger value. Note that you cannot have fewer
>> partitions than blocks.
>>
>>
>> sc.textFile doesn't commence any reading. It simply defines a
>> driver-resident data structure which can be used for further processing.
>>
>> It is not until an action is called on an RDD that Spark will build up a
>> strategy to perform all the required transforms (including the read) and
>> then return the result.
>>
>> If there is an action called to run the sequence, and your next
>> transformation after the read is to map, then Spark will need to read a
>> small section of lines of the file (according to the partitioning strategy
>> based on the number of cores) and then immediately start to map it until it
>> needs to return a result to the driver, or shuffle before the next sequence
>> of transformations.
>>
>> If your partitioning strategy (defaultMinPartitions) seems to be
>> swamping the workers because the java representation of your partition (an
>> InputSplit in HDFS terms) is bigger than available executor memory, then
>> you need to specify the number of partitions to read as the second
>> parameter to textFile. You can calculate the ideal number of partitions
>> by dividing your file size by your target partition size (allowing for
>> memory growth). A simple check that the file can be read would be:
>>
>> sc.textFile(file, numPartitions).count()
>>
>> You can get good explanation here :
>> https://stackoverflow.com/questions/29011574/how-does-
>> partitioning-work-for-data-from-files-on-hdfs
>>
>>
>>
>> Regards,
>> Vaquar khan
>>
>>
>> On Jun 11, 2017 5:28 AM, "ayan guha"  wrote:
>>
>>> Hi
>>>
>>> My question is what happens if I have 1 file of say 100gb. Then how many
>>> partitions will be there?
>>>
>>> Best
>>> Ayan
>>> On Sun, 11 Jun 2017 at 9:36 am, vaquar khan 
>>> wrote:
>>>
 Hi Ayan,

 If you have multiple files (example 12 files )and you are using
 following code then you will get 12 partition.

 r = sc.textFile("file://my/file/*")

 Not sure what you want to know about file system ,please check API doc.


 Regards,
 Vaquar khan


 On Jun 8, 2017 10:44 AM, "ayan guha"  wrote:

 Any one?

 On Thu, 8 Jun 2017 at 3:26 pm, ayan guha  wrote:

> Hi Guys
>
> Quick one: How spark deals (ie create partitions) with large files
> sitting on NFS, assuming the all executors can see the file exactly same
> way.
>
> ie, when I run
>
> r = sc.textFile("file://my/file")
>
> what happens if the file is on NFS?
>
> is there any difference from
>
> r = sc.textFile("hdfs://my/file")
>
> Are the input formats used same in both cases?
>
>
> --
> Best Regards,
> Ayan Guha
>
 --
 Best Regards,
 Ayan Guha


 --
>>> Best Regards,
>>> Ayan Guha
>>>
>> --
> Best Regards,
> Ayan Guha
>