Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread Apostolos N. Papadopoulos

Dear Sam,

you are assuming that the data fits in the memory of your local machine. 
You are using as a basis a dataframe, which potentially can be very 
large, and then you are storing the data in local lists. Keep in mind 
that that the number of distinct elements in a column may be very large 
(depending on the app). I suggest to work on a solution that assumes 
that the number of distinct values is also large. Thus, you should keep 
your data in dataframes or RDDs, and store them as csv files, parquet, etc.


a.p.


On 10/2/23 23:40, sam smith wrote:
I want to get the distinct values of each column in a List (is it good 
practice to use List here?), that contains as first element the column 
name, and the other element its distinct values so that for a dataset 
we get a list of lists, i do it this way (in my opinion no so fast):


|List> finalList = new ArrayList>(); 
Dataset df = spark.read().format("csv").option("header", 
"true").load("/pathToCSV"); String[] columnNames = df.columns(); for 
(int i=0;i columnList = new 
ArrayList(); columnList.add(columnNames[i]); List 
columnValues = 
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); 
for (int j=0;jcolumnList.add(columnValues.get(j).apply(0).toString()); 
finalList.add(columnList);|


How to improve this?

Also, can I get the results in JSON format?


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-05 Thread Apostolos N. Papadopoulos
First of all, define "far outperforming". For sure, there is no GOD 
system that does everything perfectly.


In which use-cases are you referring to? It would be interesting to the 
community to see some comparisons.


a.


On 5/7/22 12:29, Gourav Sengupta wrote:

Hi,

SPARK is just one of the technologies out there now, there are several 
other technologies far outperforming SPARK or at least as good as SPARK.




Regards,
Gourav

On Sat, Jul 2, 2022 at 7:42 PM Sid  wrote:

So as per the discussion, shuffle stages output is also stored on
disk and not in memory?

On Sat, Jul 2, 2022 at 8:44 PM krexos  wrote:


thanks a lot!

--- Original Message ---
On Saturday, July 2nd, 2022 at 6:07 PM, Sean Owen
 wrote:


I think that is more accurate yes. Though, shuffle files are
local, not on distributed storage too, which is an advantage.
MR also had map only transforms and chained mappers, but
harder to use. Not impossible but you could also say Spark
just made it easier to do the more efficient thing.

On Sat, Jul 2, 2022, 9:34 AM krexos
 wrote:


You said Spark performs IO only when reading data and
writing final data to the disk. I though by that you
meant that it only reads the input files of the job and
writes the output of the whole job to the disk, but in
reality spark does store intermediate results on disk,
just in less places than MR

--- Original Message ---
On Saturday, July 2nd, 2022 at 5:27 PM, Sid
 wrote:


I have explained the same thing in a very layman's
terms. Go through it once.

On Sat, 2 Jul 2022, 19:45 krexos,
 wrote:


I think I understand where Spark saves IO.

in MR we have map -> reduce -> map -> reduce -> map
-> reduce ...

which writes results do disk at the end of each such
"arrow",

on the other hand in spark we have

map -> reduce + map -> reduce + map -> reduce ...

which saves about 2 times the IO

thanks everyone,
krexos

--- Original Message ---
On Saturday, July 2nd, 2022 at 1:35 PM, krexos
 wrote:


Hello,

One of the main "selling points" of Spark is that
unlike Hadoop map-reduce that persists intermediate
results of its computation to HDFS (disk), Spark
keeps all its results in memory. I don't understand
this as in reality when a Spark stage finishesit
writes all of the data into shuffle files stored on
the disk

<https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md>.
How then is this an improvement on map-reduce?

Image from https://youtu.be/7ooZ4S7Ay6Y


thanks!







--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol


Re: Spark Doubts

2022-06-21 Thread Apostolos N. Papadopoulos

Dear Sid.

You are asking questions for which answers exist in the Apache Spark 
website or in books or in MOOCS or in other URLs.


For example, take a look at this one: 
https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/

<https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/>

https://spark.apache.org/docs/latest/sql-programming-guide.html

What do you mean by question 2?

About question 3, it depends on how you load the file. For example, if 
you have a text file in HDFS and you want to


use an RDD, initially, the number of partitions equals the number of 
HDFS blocks, unless you specify the number of


partitions when you create the RDD from the file.

I would suggest first to go through a book devoted to Spark, like The 
Definitive Guide, or any other similar resource.


Also, I would suggest to take a MOOC on Spark (e.g., in Coursera, edX, 
etc).


All the best,

Apostolos


On 21/6/22 22:16, Sid wrote:

Hi Team,

I have a few doubts about the below questions:

1) data frame will reside where? memory? disk? memory allocation about 
data frame?

2) How do you configure each partition?
3) Is there any way to calculate the exact partitions needed to load a 
specific file?


Thanks,
Sid


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Issues getting Apache Spark

2022-05-26 Thread Apostolos N. Papadopoulos
How can we help if we do not know what is the problem? What is the error 
you are getting, at which step?


Please give us more info to be able to help you. Spark installation on 
Linux/Windows is easy if you follow exactly the


guidelines.

Regards,

Apostolos


On 26/5/22 22:19, Martin, Michael wrote:


Hello,

I’m writing to request assistance in getting Apache Spark on my 
laptop. I’ve followed instructions telling me to get Java, Python, 
Hadoop, Winutils, and Spark itself. I’ve followed instructions 
illustrating how to set my environment variables. For some reason, I 
still cannot get Spark to work on my laptop.


Michael Martin


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol


Re: Complexity with the data

2022-05-26 Thread Apostolos N. Papadopoulos
Since you cannot create the DF directly, you may try to first create an 
RDD of tuples from the file


and then convert the RDD to a DF by using the toDF() transformation.

Perhaps you may bypass the issue with this.

Another thing that I have seen in the example is that you are using "" 
as an escape character.


Can you check if this may cause any issues?

Regards,

Apostolos



On 26/5/22 16:31, Sid wrote:
Thanks for opening the issue, Bjorn. However, could you help me to 
address the problem for now with some kind of alternative?


I am actually stuck in this since yesterday.

Thanks,
Sid

On Thu, 26 May 2022, 18:48 Bjørn Jørgensen,  
wrote:


Yes, it looks like a bug that we also have in pandas API on spark.

So I have opened a JIRA
<https://issues.apache.org/jira/browse/SPARK-39304> for this.

tor. 26. mai 2022 kl. 11:09 skrev Sid :

Hello Everyone,

I have posted a question finally with the dataset and the
column names.

PFB link:


https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark

Thanks,
Sid

On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen
 wrote:

Sid, dump one of yours files.


https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/



ons. 25. mai 2022, 23:04 skrev Sid :

I have 10 columns with me but in the dataset, I
observed that some records have 11 columns of data(for
the additional column it is marked as null). But, how
do I handle this?

Thanks,
Sid

On Thu, May 26, 2022 at 2:22 AM Sid
 wrote:

How can I do that? Any examples or links, please.
So, this works well with pandas I suppose. It's
just that I need to convert back to the spark data
frame by providing a schema but since we are using
a lower spark version and pandas won't work in a
distributed way in the lower versions, therefore,
was wondering if spark could handle this in a much
better way.

Thanks,
Sid

On Thu, May 26, 2022 at 2:19 AM Gavin Ray
 wrote:

Forgot to reply-all last message, whoops. Not
very good at email.

You need to normalize the CSV with a parser
that can escape commas inside of strings
Not sure if Spark has an option for this?


On Wed, May 25, 2022 at 4:37 PM Sid
 wrote:

Thank you so much for your time.

I have data like below which I tried to
load by setting multiple options while
reading the file but however, but I am not
able to consolidate the 9th column data
within itself.

image.png

I tried the below code:

df = spark.read.option("header",
"true").option("multiline",
"true").option("inferSchema",
"true").option("quote",
    '"').option(
    "delimiter", ",").csv("path")

What else I can do?

    Thanks,
    Sid


On Thu, May 26, 2022 at 1:46 AM Apostolos
N. Papadopoulos  wrote:

Dear Sid,

can you please give us more info? Is
it true that every line may have a
different number of columns? Is there
any rule followed by

every line of the file? From the
information you have sent I cannot
fully understand the "schema" of your
data.

Regards,

Apostolos


On 25/5/22 23:06, Sid wrote:
> Hi Experts,
>
> I have below CSV data that is
getting generated automatically. I

Re: Complexity with the data

2022-05-25 Thread Apostolos N. Papadopoulos

Dear Sid,

can you please give us more info? Is it true that every line may have a 
different number of columns? Is there any rule followed by


every line of the file? From the information you have sent I cannot 
fully understand the "schema" of your data.


Regards,

Apostolos


On 25/5/22 23:06, Sid wrote:

Hi Experts,

I have below CSV data that is getting generated automatically. I can't 
change the data manually.


The data looks like below:

2020-12-12,abc,2000,,INR,
2020-12-09,cde,3000,he is a manager,DOLLARS,nothing
2020-12-09,fgh,,software_developer,I only manage the development part.

Since I don't have much experience with the other domains.

It is handled by the other people.,INR
2020-12-12,abc,2000,,USD,

The third record is a problem. Since the value is separated by the new 
line by the user while filling up the form. So, how do I handle this?


There are 6 columns and 4 records in total. These are the sample records.

Should I load it as RDD and then may be using a regex should eliminate 
the new lines? Or how it should be? with ". /n" ?


Any suggestions?

Thanks,
Sid


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Running the driver on a laptop but data is on the Spark server

2020-11-25 Thread Apostolos N. Papadopoulos

Hi Ryan,

since the driver is at your laptop, in order to access a remote file you 
need to specify the url for this I guess.


For example, when I am using Spark over HDFS I specify the file like 
hdfs://blablabla which contains the url where namenode


can answer. I believe that something similar must be done here.

all the best,

Apostolos


On 25/11/20 16:51, Ryan Victory wrote:

Hello!

I have been tearing my hair out trying to solve this problem. Here is 
my setup:


1. I have Spark running on a server in standalone mode with data on 
the filesystem of the server itself (/opt/data/).
2. I have an instance of a Hive Metastore server running (backed by 
MariaDB) on the same server

3. I have a laptop where I am developing my spark jobs (Scala)

I have configured Spark to use the metastore and set the warehouse 
directory to be in /opt/data/warehouse/. What I am trying to 
accomplish are a couple of things:


1. I am trying to submit Spark jobs (via JARs) using spark-submit, but 
have the driver run on my local machine (my laptop). I want the jobs 
to use the data ON THE SERVER and not try to reference it from my 
local machine. If I do something like this:


val df = spark.sql("SELECT * FROM 
parquet.`/opt/data/transactions.parquet`")


I get an error that the path doesn't exist (because it's trying to 
find it on my laptop). If I run the same thing in a spark-shell on the 
spark server itself, there isn't an issue because the driver has 
access to the data. If I submit the job with submit-mode=cluster then 
it works too because the driver is on the cluster. I don't want this, 
I want to get the results on my laptop.


How can I force Spark to read the data from the cluster's filesystem 
and not the driver's?


2. I have setup a Hive Metastore and created a table (in the spark 
shell on the spark server itself). The data in the warehouse is in the 
local filesystem. When I create a spark application JAR and try to run 
it from my laptop, I get the same problem as #1, namely that it tries 
to find the warehouse directory on my laptop itself.


Am I crazy? Perhaps this isn't a supported way to use Spark? Any help 
or insights are much appreciated!


-Ryan Victory


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Serialization error when using scala kernel with Jupyter

2020-02-21 Thread Apostolos N. Papadopoulos
collect() returns the contents of the RDD back to the Driver in a local 
variable. Where is the local variable?


Try

val result = rdd.map(x => x + 1).collect()

regards,

Apostolos



On 21/2/20 21:28, Nikhil Goyal wrote:

Hi all,
I am trying to use almond scala kernel to run spark session on 
Jupyter. I am using scala version 2.12.8. I am creating spark session 
with master set to Yarn.

This is the code:

val rdd = spark.sparkContext.parallelize(Seq(1, 2, 4))
rdd.map(x => x + 1).collect()

Exception:
java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
instance of org.apache.spark.rdd.MapPartitionsRDD


I was wondering if anyone has seen this before.

Thanks
Nikhil


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol



Re: writing a small csv to HDFS is super slow

2019-03-22 Thread Apostolos N. Papadopoulos
Is it also slow when you do not repartition? (i.e., to get multiple 
output files)


Also did you try simply saveAsTextFile?

Also, before repartition, how many partitions are there?

a.


On 22/3/19 23:34, Lian Jiang wrote:

Hi,

Writing a csv to HDFS takes about 1 hour:

df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)

The generated csv file is only about 150kb. The job uses 3 containers 
(13 cores, 23g mem).


Other people have similar issues but I don't see a good explanation 
and solution.


Any clue is highly appreciated! Thanks.



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Java Heap Space error - Spark ML

2019-03-22 Thread Apostolos N. Papadopoulos
What is the size of your data, size of the cluster, are you using 
spark-submit or an IDE, what spark version are you using?


Try spark-submit and increase the memory of the driver or the executors.

a.


On 22/3/19 17:19, KhajaAsmath Mohammed wrote:

Hi,

I am getting the below exception when using Spark Kmeans. Any 
solutions from the experts. Would be really helpful.


val kMeans = new KMeans().setK(reductionCount).setMaxIter(30)

    val kMeansModel = kMeans.fit(df)

Error is occured when calling kmeans.fit


Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760)
        at 
org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614)
        at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
        at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at 
org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382)
        at 
org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)

        at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
        at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
        at 
com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25)
        at 
com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182)
        at 
com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94)
        at 
com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,
Asmath


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol



Re: Apply Kmeans in partitions

2019-01-30 Thread Apostolos N. Papadopoulos

Hi Dimitri,

what is the error you are getting, please specify.

Apostolos


On 30/1/19 16:30, dimitris plakas wrote:

Hello everyone,

I have a dataframe which has 5040 rows where these rows are splitted 
in 5 groups. So i have a column called "Group_Id" which marks every 
row with values from 0-4 depending on in which group every rows 
belongs to. I am trying to split my dataframe to 5 partitions and 
apply Kmeans to every partition. I have tried


rdd=mydataframe.rdd.mapPartitions(function, True)
test = Kmeans.train(rdd, num_of_centers, "random")

but i get an error.

How can i apply Kmeans to every partition?

Thank you in advance,


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed

2018-10-04 Thread Apostolos N. Papadopoulos

Maybe this can help.

https://stackoverflow.com/questions/32959723/set-python-path-for-spark-worker



On 04/10/2018 12:19 μμ, Jianshi Huang wrote:

Hi,

I have a problem using multiple versions of Pyspark on YARN, the 
driver and worker nodes are all preinstalled with Spark 2.2.1, for 
production tasks. And I want to use 2.3.2 for my personal EDA.


I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), 
however on the worker node, the PYTHONPATH still uses the system 
SPARK_HOME.


Anyone knows how to override the PYTHONPATH on worker nodes?

Here's the error message,


Py4JJavaError: An error occurred while calling o75.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 0.0 (TID 3, emr-worker-8.cluster-68492,
executor 2): org.apache.spark.SparkException:
Error from python worker:
Traceback (most recent call last):
File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183,
in _run_module_as_main
mod_name, mod_spec, code = _get_module_details(mod_name, _Error)
File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109,
in _get_module_details
__import__(pkg_name)
File
"/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py",
line 46, in 
File
"/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py",
line 29, in 
ModuleNotFoundError: No module named 'py4j'
PYTHONPATH was:

/usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar


And here's how I started Pyspark session in Jupyter.


%env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7
%env PYSPARK_PYTHON=/usr/bin/python3
import findspark
findspark.init()
import pyspark
sparkConf = pyspark.SparkConf()
sparkConf.setAll([
('spark.cores.max', '96')
,('spark.driver.memory', '2g')
,('spark.executor.cores', '4')
,('spark.executor.instances', '2')
,('spark.executor.memory', '4g')
,('spark.network.timeout', '800')
,('spark.scheduler.mode', 'FAIR')
,('spark.shuffle.service.enabled', 'true')
,('spark.dynamicAllocation.enabled', 'true')
])
py_files =
['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip']
sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client",
conf=sparkConf, pyFiles=py_files)



Thanks,
--
Jianshi Huang



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol



Re: Local vs Cluster

2018-09-14 Thread Apostolos N. Papadopoulos

Hi Aakash,

in the cluster you need to consider the total number of executors you 
are using. Please take a look in the following link


for an introduction.


https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html


regards,

Apostolos




On 14/09/2018 11:21 πμ, Aakash Basu wrote:

Hi,

What is the Spark cluster equivalent of standalone's local[N]. I mean, 
the value we set as a parameter of local as N, which parameter takes 
it in the cluster mode?


Thanks,
Aakash.


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark job's driver programe consums too much memory

2018-09-07 Thread Apostolos N. Papadopoulos
You are putting all together and this does not make sense. Writing data 
to HDFS does not require that all data should be transfered back to the 
driver and THEN saved to HDFS.


This would be a disaster and it would never scale. I suggest to check 
the documentation more carefully because I believe you are a bit confused.


regards,

Apostolos



On 07/09/2018 05:39 μμ, James Starks wrote:

Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc shows that 
my spark doesn't use those actions functions. But save functions looks resembling the function 
df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my spark job 
uses. Therefore I am thinking maybe that's the reason why my spark job driver consumes such amount 
of memory.

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions

My spark job's driver program consumes too much memory, so I want to prevent 
that by writing data to hdfs at the executor side, instead of waiting those 
data to be sent back to the driver program (then writing to hdfs). This is 
because our worker servers have bigger memory size than the one that runs 
driver program. If I can write data to hdfs at executor, then the driver memory 
for my spark job can be reduced.

Otherwise does Spark support streaming read from database (i.e. spark streaming 
+ spark sql)?

Thanks for your reply.



‐‐‐ Original Message ‐‐‐
On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos  
wrote:


Dear James,

-   check the Spark documentation to see the actions that return a lot of
 data back to the driver. One of these actions is collect(). However,
 take(x) is an action, also reduce() is an action.

 Before executing collect() find out what is the size of your RDD/DF.

-   I cannot understand the phrase "hdfs directly from the executor". You
 can specify an hdfs file as your input and also you can use hdfs to
 store your output.

 regards,

 Apostolos

 On 07/09/2018 05:04 μμ, James Starks wrote:



I have a Spark job that read data from database. By increasing submit
parameter '--driver-memory 25g' the job can works without a problem
locally but not in prod env because prod master do not have enough
capacity.
So I have a few questions:
-  What functions such as collecct() would cause the data to be sent
back to the driver program?
   My job so far merely uses `as`, `filter`, `map`, and `filter`.

-   Is it possible to write data (in parquet format for instance) to
 hdfs directly from the executor? If so how can I do (any code snippet,
 doc for reference, or what keyword to search cause can't find by e.g.
 `spark direct executor hdfs write`)?


Thanks

--

Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


---

To unsubscribe e-mail: user-unsubscr...@spark.apache.org





--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark job's driver programe consums too much memory

2018-09-07 Thread Apostolos N. Papadopoulos

Dear James,

- check the Spark documentation to see the actions that return a lot of 
data back to the driver. One of these actions is collect(). However, 
take(x) is an action, also reduce() is an action.


Before executing collect() find out what is the size of your RDD/DF.

- I cannot understand the phrase "hdfs directly from the executor". You 
can specify an hdfs file as your input and also you can use hdfs to 
store your output.



regards,

Apostolos



On 07/09/2018 05:04 μμ, James Starks wrote:
I have a Spark job that read data from database. By increasing submit 
parameter '--driver-memory 25g' the job can works without a problem 
locally but not in prod env because prod master do not have enough 
capacity.


So I have a few questions:

-  What functions such as collecct() would cause the data to be sent 
back to the driver program?

  My job so far merely uses `as`, `filter`, `map`, and `filter`.

- Is it possible to write data (in parquet format for instance) to 
hdfs directly from the executor? If so how can I do (any code snippet, 
doc for reference, or what keyword to search cause can't find by e.g. 
`spark direct executor hdfs write`)?


Thanks






--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Error in show()

2018-09-06 Thread Apostolos N. Papadopoulos
Can you isolate the row that is causing the problem? I mean start using 
show(31) up to show(60).


Perhaps this will help you to understand the problem.

regards,

Apostolos



On 07/09/2018 01:11 πμ, dimitris plakas wrote:
Hello everyone, I am new in Pyspark and i am facing an issue. Let me 
explain what exactly is the problem.


I have a dataframe and i apply on this a map() function 
(dataframe2=datframe1.rdd.map(custom_function())

dataframe = sqlContext.createDataframe(dataframe2)

when i have

dataframe.show(30,True) it shows the result,

when i am using dataframe.show(60, True) i get the error. The Error is 
in the attachement Pyspark_Error.txt.


Could you please explain me what is this error and how to overpass it?



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol



Re: Parallelism: behavioural difference in version 1.2 and 2.1!?

2018-08-29 Thread Apostolos N. Papadopoulos

Dear Jeevan,

Spark 1.2 is quite old, and If I were you I would go for a newer version.

However, is there a parallelism level (e.g., 20, 30) that works for both 
installations?


regards,

Apostolos



On 29/08/2018 04:55 μμ, jeevan.ks wrote:

Hi,

I've two systems. One is built on Spark 1.2 and the other on 2.1. I am
benchmarking both with the same benchmarks (wordcount, grep, sort, etc.)
with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The
Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and
28GB RAM. I observed a strange behaviour while running the benchmarks and is
as follows:

- When I ran Spark 1.2 version with default partition number
(sc.defaultParallelism), the jobs would take forever to complete. So I
changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic
and the jobs completed quickly.

- However, when I tried the above magic number on the version 2.1, the jobs
are taking forever. Deafult parallelism works better, but not that
efficient.

I'm having problem to rationalise this and compare both the systems. My
question is: what changes were made from 1.2 to 2.1 with respect to default
parallelism for this behaviour to occur? How can I have both versions behave
similary on the same software/hardware configuration so that I can compare?

I'd really appreciate your help on this!

Cheers,
Jeevan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://delab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Create an Empty dataframe

2018-06-30 Thread Apostolos N. Papadopoulos

Hi Dimitri,

you can do the following:

1. create an initial dataframe from an empty csv

2. use "union" to insert new rows

Do not forget that Spark cannot replace a DBMS. Spark is mainly be used 
for analytics.


If you need select/insert/delete/update capabilities, perhaps you should 
look at a DBMS.



Another alternative, in case you need "append only" semantics, is to use 
streaming or structured streaming.



regards,

Apostolos




On 30/06/2018 05:46 μμ, dimitris plakas wrote:
I am new to Pyspark and want to initialize a new empty dataframe with 
sqlContext() with two columns ("Column1", "Column2"), and i want to 
append rows dynamically in a for loop.

Is there any way to achieve this?

Thank you in advance.


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://delab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Increase no of tasks

2018-06-22 Thread Apostolos N. Papadopoulos

How many partitions do you have in your data?



On 22/06/2018 09:46 μμ, pratik4891 wrote:

Hi Gurus,

I am running a spark job and in one stage it's creating 9 tasks .So even if
I have 25 executors only 9s are getting utilized.

The other executors going to dead status , how can I increase the no of
tasks so all my executors can be utilized.Any help/guidance is appreciated
:)
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8535/s1.jpg>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8535/s2.jpg>



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://delab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org