Re: How to improve efficiency of this piece of code (returning distinct column values)
Dear Sam, you are assuming that the data fits in the memory of your local machine. You are using as a basis a dataframe, which potentially can be very large, and then you are storing the data in local lists. Keep in mind that that the number of distinct elements in a column may be very large (depending on the app). I suggest to work on a solution that assumes that the number of distinct values is also large. Thus, you should keep your data in dataframes or RDDs, and store them as csv files, parquet, etc. a.p. On 10/2/23 23:40, sam smith wrote: I want to get the distinct values of each column in a List (is it good practice to use List here?), that contains as first element the column name, and the other element its distinct values so that for a dataset we get a list of lists, i do it this way (in my opinion no so fast): |List> finalList = new ArrayList>(); Dataset df = spark.read().format("csv").option("header", "true").load("/pathToCSV"); String[] columnNames = df.columns(); for (int i=0;i columnList = new ArrayList(); columnList.add(columnNames[i]); List columnValues = df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); for (int j=0;jcolumnList.add(columnValues.get(j).apply(0).toString()); finalList.add(columnList);| How to improve this? Also, can I get the results in JSON format? -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email:papad...@csd.auth.gr twitter: @papadopoulos_ap web:http://datalab.csd.auth.gr/~apostol
Re: How is Spark a memory based solution if it writes data to disk before shuffles?
First of all, define "far outperforming". For sure, there is no GOD system that does everything perfectly. In which use-cases are you referring to? It would be interesting to the community to see some comparisons. a. On 5/7/22 12:29, Gourav Sengupta wrote: Hi, SPARK is just one of the technologies out there now, there are several other technologies far outperforming SPARK or at least as good as SPARK. Regards, Gourav On Sat, Jul 2, 2022 at 7:42 PM Sid wrote: So as per the discussion, shuffle stages output is also stored on disk and not in memory? On Sat, Jul 2, 2022 at 8:44 PM krexos wrote: thanks a lot! --- Original Message --- On Saturday, July 2nd, 2022 at 6:07 PM, Sean Owen wrote: I think that is more accurate yes. Though, shuffle files are local, not on distributed storage too, which is an advantage. MR also had map only transforms and chained mappers, but harder to use. Not impossible but you could also say Spark just made it easier to do the more efficient thing. On Sat, Jul 2, 2022, 9:34 AM krexos wrote: You said Spark performs IO only when reading data and writing final data to the disk. I though by that you meant that it only reads the input files of the job and writes the output of the whole job to the disk, but in reality spark does store intermediate results on disk, just in less places than MR --- Original Message --- On Saturday, July 2nd, 2022 at 5:27 PM, Sid wrote: I have explained the same thing in a very layman's terms. Go through it once. On Sat, 2 Jul 2022, 19:45 krexos, wrote: I think I understand where Spark saves IO. in MR we have map -> reduce -> map -> reduce -> map -> reduce ... which writes results do disk at the end of each such "arrow", on the other hand in spark we have map -> reduce + map -> reduce + map -> reduce ... which saves about 2 times the IO thanks everyone, krexos --- Original Message --- On Saturday, July 2nd, 2022 at 1:35 PM, krexos wrote: Hello, One of the main "selling points" of Spark is that unlike Hadoop map-reduce that persists intermediate results of its computation to HDFS (disk), Spark keeps all its results in memory. I don't understand this as in reality when a Spark stage finishesit writes all of the data into shuffle files stored on the disk <https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md>. How then is this an improvement on map-reduce? Image from https://youtu.be/7ooZ4S7Ay6Y thanks! -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email:papad...@csd.auth.gr twitter: @papadopoulos_ap web:http://datalab.csd.auth.gr/~apostol
Re: Spark Doubts
Dear Sid. You are asking questions for which answers exist in the Apache Spark website or in books or in MOOCS or in other URLs. For example, take a look at this one: https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/ <https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/> https://spark.apache.org/docs/latest/sql-programming-guide.html What do you mean by question 2? About question 3, it depends on how you load the file. For example, if you have a text file in HDFS and you want to use an RDD, initially, the number of partitions equals the number of HDFS blocks, unless you specify the number of partitions when you create the RDD from the file. I would suggest first to go through a book devoted to Spark, like The Definitive Guide, or any other similar resource. Also, I would suggest to take a MOOC on Spark (e.g., in Coursera, edX, etc). All the best, Apostolos On 21/6/22 22:16, Sid wrote: Hi Team, I have a few doubts about the below questions: 1) data frame will reside where? memory? disk? memory allocation about data frame? 2) How do you configure each partition? 3) Is there any way to calculate the exact partitions needed to load a specific file? Thanks, Sid -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Issues getting Apache Spark
How can we help if we do not know what is the problem? What is the error you are getting, at which step? Please give us more info to be able to help you. Spark installation on Linux/Windows is easy if you follow exactly the guidelines. Regards, Apostolos On 26/5/22 22:19, Martin, Michael wrote: Hello, I’m writing to request assistance in getting Apache Spark on my laptop. I’ve followed instructions telling me to get Java, Python, Hadoop, Winutils, and Spark itself. I’ve followed instructions illustrating how to set my environment variables. For some reason, I still cannot get Spark to work on my laptop. Michael Martin -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email:papad...@csd.auth.gr twitter: @papadopoulos_ap web:http://datalab.csd.auth.gr/~apostol
Re: Complexity with the data
Since you cannot create the DF directly, you may try to first create an RDD of tuples from the file and then convert the RDD to a DF by using the toDF() transformation. Perhaps you may bypass the issue with this. Another thing that I have seen in the example is that you are using "" as an escape character. Can you check if this may cause any issues? Regards, Apostolos On 26/5/22 16:31, Sid wrote: Thanks for opening the issue, Bjorn. However, could you help me to address the problem for now with some kind of alternative? I am actually stuck in this since yesterday. Thanks, Sid On Thu, 26 May 2022, 18:48 Bjørn Jørgensen, wrote: Yes, it looks like a bug that we also have in pandas API on spark. So I have opened a JIRA <https://issues.apache.org/jira/browse/SPARK-39304> for this. tor. 26. mai 2022 kl. 11:09 skrev Sid : Hello Everyone, I have posted a question finally with the dataset and the column names. PFB link: https://stackoverflow.com/questions/72389385/how-to-load-complex-data-using-pyspark Thanks, Sid On Thu, May 26, 2022 at 2:40 AM Bjørn Jørgensen wrote: Sid, dump one of yours files. https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/ ons. 25. mai 2022, 23:04 skrev Sid : I have 10 columns with me but in the dataset, I observed that some records have 11 columns of data(for the additional column it is marked as null). But, how do I handle this? Thanks, Sid On Thu, May 26, 2022 at 2:22 AM Sid wrote: How can I do that? Any examples or links, please. So, this works well with pandas I suppose. It's just that I need to convert back to the spark data frame by providing a schema but since we are using a lower spark version and pandas won't work in a distributed way in the lower versions, therefore, was wondering if spark could handle this in a much better way. Thanks, Sid On Thu, May 26, 2022 at 2:19 AM Gavin Ray wrote: Forgot to reply-all last message, whoops. Not very good at email. You need to normalize the CSV with a parser that can escape commas inside of strings Not sure if Spark has an option for this? On Wed, May 25, 2022 at 4:37 PM Sid wrote: Thank you so much for your time. I have data like below which I tried to load by setting multiple options while reading the file but however, but I am not able to consolidate the 9th column data within itself. image.png I tried the below code: df = spark.read.option("header", "true").option("multiline", "true").option("inferSchema", "true").option("quote", '"').option( "delimiter", ",").csv("path") What else I can do? Thanks, Sid On Thu, May 26, 2022 at 1:46 AM Apostolos N. Papadopoulos wrote: Dear Sid, can you please give us more info? Is it true that every line may have a different number of columns? Is there any rule followed by every line of the file? From the information you have sent I cannot fully understand the "schema" of your data. Regards, Apostolos On 25/5/22 23:06, Sid wrote: > Hi Experts, > > I have below CSV data that is getting generated automatically. I
Re: Complexity with the data
Dear Sid, can you please give us more info? Is it true that every line may have a different number of columns? Is there any rule followed by every line of the file? From the information you have sent I cannot fully understand the "schema" of your data. Regards, Apostolos On 25/5/22 23:06, Sid wrote: Hi Experts, I have below CSV data that is getting generated automatically. I can't change the data manually. The data looks like below: 2020-12-12,abc,2000,,INR, 2020-12-09,cde,3000,he is a manager,DOLLARS,nothing 2020-12-09,fgh,,software_developer,I only manage the development part. Since I don't have much experience with the other domains. It is handled by the other people.,INR 2020-12-12,abc,2000,,USD, The third record is a problem. Since the value is separated by the new line by the user while filling up the form. So, how do I handle this? There are 6 columns and 4 records in total. These are the sample records. Should I load it as RDD and then may be using a regex should eliminate the new lines? Or how it should be? with ". /n" ? Any suggestions? Thanks, Sid -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Running the driver on a laptop but data is on the Spark server
Hi Ryan, since the driver is at your laptop, in order to access a remote file you need to specify the url for this I guess. For example, when I am using Spark over HDFS I specify the file like hdfs://blablabla which contains the url where namenode can answer. I believe that something similar must be done here. all the best, Apostolos On 25/11/20 16:51, Ryan Victory wrote: Hello! I have been tearing my hair out trying to solve this problem. Here is my setup: 1. I have Spark running on a server in standalone mode with data on the filesystem of the server itself (/opt/data/). 2. I have an instance of a Hive Metastore server running (backed by MariaDB) on the same server 3. I have a laptop where I am developing my spark jobs (Scala) I have configured Spark to use the metastore and set the warehouse directory to be in /opt/data/warehouse/. What I am trying to accomplish are a couple of things: 1. I am trying to submit Spark jobs (via JARs) using spark-submit, but have the driver run on my local machine (my laptop). I want the jobs to use the data ON THE SERVER and not try to reference it from my local machine. If I do something like this: val df = spark.sql("SELECT * FROM parquet.`/opt/data/transactions.parquet`") I get an error that the path doesn't exist (because it's trying to find it on my laptop). If I run the same thing in a spark-shell on the spark server itself, there isn't an issue because the driver has access to the data. If I submit the job with submit-mode=cluster then it works too because the driver is on the cluster. I don't want this, I want to get the results on my laptop. How can I force Spark to read the data from the cluster's filesystem and not the driver's? 2. I have setup a Hive Metastore and created a table (in the spark shell on the spark server itself). The data in the warehouse is in the local filesystem. When I create a spark application JAR and try to run it from my laptop, I get the same problem as #1, namely that it tries to find the warehouse directory on my laptop itself. Am I crazy? Perhaps this isn't a supported way to use Spark? Any help or insights are much appreciated! -Ryan Victory -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Serialization error when using scala kernel with Jupyter
collect() returns the contents of the RDD back to the Driver in a local variable. Where is the local variable? Try val result = rdd.map(x => x + 1).collect() regards, Apostolos On 21/2/20 21:28, Nikhil Goyal wrote: Hi all, I am trying to use almond scala kernel to run spark session on Jupyter. I am using scala version 2.12.8. I am creating spark session with master set to Yarn. This is the code: val rdd = spark.sparkContext.parallelize(Seq(1, 2, 4)) rdd.map(x => x + 1).collect() Exception: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD I was wondering if anyone has seen this before. Thanks Nikhil -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Re: writing a small csv to HDFS is super slow
Is it also slow when you do not repartition? (i.e., to get multiple output files) Also did you try simply saveAsTextFile? Also, before repartition, how many partitions are there? a. On 22/3/19 23:34, Lian Jiang wrote: Hi, Writing a csv to HDFS takes about 1 hour: df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv) The generated csv file is only about 150kb. The job uses 3 containers (13 cores, 23g mem). Other people have similar issues but I don't see a good explanation and solution. Any clue is highly appreciated! Thanks. -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Java Heap Space error - Spark ML
What is the size of your data, size of the cluster, are you using spark-submit or an IDE, what spark version are you using? Try spark-submit and increase the memory of the driver or the executors. a. On 22/3/19 17:19, KhajaAsmath Mohammed wrote: Hi, I am getting the below exception when using Spark Kmeans. Any solutions from the experts. Would be really helpful. val kMeans = new KMeans().setK(reductionCount).setMaxIter(30) val kMeansModel = kMeans.fit(df) Error is occured when calling kmeans.fit Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at org.apache.spark.mllib.linalg.SparseVector.toArray(Vectors.scala:760) at org.apache.spark.mllib.clustering.VectorWithNorm.toDense(KMeans.scala:614) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$3.apply(KMeans.scala:382) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:382) at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256) at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227) at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319) at com.datamantra.spark.DataBalancing$.createBalancedDataframe(DataBalancing.scala:25) at com.datamantra.spark.jobs.IftaMLTraining$.trainML$1(IftaMLTraining.scala:182) at com.datamantra.spark.jobs.IftaMLTraining$.main(IftaMLTraining.scala:94) at com.datamantra.spark.jobs.IftaMLTraining.main(IftaMLTraining.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks, Asmath -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Re: Apply Kmeans in partitions
Hi Dimitri, what is the error you are getting, please specify. Apostolos On 30/1/19 16:30, dimitris plakas wrote: Hello everyone, I have a dataframe which has 5040 rows where these rows are splitted in 5 groups. So i have a column called "Group_Id" which marks every row with values from 0-4 depending on in which group every rows belongs to. I am trying to split my dataframe to 5 partitions and apply Kmeans to every partition. I have tried rdd=mydataframe.rdd.mapPartitions(function, True) test = Kmeans.train(rdd, num_of_centers, "random") but i get an error. How can i apply Kmeans to every partition? Thank you in advance, -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Specifying different version of pyspark.zip and py4j files on worker nodes with Spark pre-installed
Maybe this can help. https://stackoverflow.com/questions/32959723/set-python-path-for-spark-worker On 04/10/2018 12:19 μμ, Jianshi Huang wrote: Hi, I have a problem using multiple versions of Pyspark on YARN, the driver and worker nodes are all preinstalled with Spark 2.2.1, for production tasks. And I want to use 2.3.2 for my personal EDA. I've tried both 'pyFiles=' option and sparkContext.addPyFiles(), however on the worker node, the PYTHONPATH still uses the system SPARK_HOME. Anyone knows how to override the PYTHONPATH on worker nodes? Here's the error message, Py4JJavaError: An error occurred while calling o75.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, emr-worker-8.cluster-68492, executor 2): org.apache.spark.SparkException: Error from python worker: Traceback (most recent call last): File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 183, in _run_module_as_main mod_name, mod_spec, code = _get_module_details(mod_name, _Error) File "/usr/local/Python-3.6.4/lib/python3.6/runpy.py", line 109, in _get_module_details __import__(pkg_name) File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/__init__.py", line 46, in File "/usr/lib/spark-current/python/lib/pyspark.zip/pyspark/context.py", line 29, in ModuleNotFoundError: No module named 'py4j' PYTHONPATH was: /usr/lib/spark-current/python/lib/pyspark.zip:/usr/lib/spark-current/python/lib/py4j-0.10.7-src.zip:/mnt/disk1/yarn/usercache/jianshi.huang/filecache/130/__spark_libs__5227988272944669714.zip/spark-core_2.11-2.3.2.jar And here's how I started Pyspark session in Jupyter. %env SPARK_HOME=/opt/apps/ecm/service/spark/2.3.2-bin-hadoop2.7 %env PYSPARK_PYTHON=/usr/bin/python3 import findspark findspark.init() import pyspark sparkConf = pyspark.SparkConf() sparkConf.setAll([ ('spark.cores.max', '96') ,('spark.driver.memory', '2g') ,('spark.executor.cores', '4') ,('spark.executor.instances', '2') ,('spark.executor.memory', '4g') ,('spark.network.timeout', '800') ,('spark.scheduler.mode', 'FAIR') ,('spark.shuffle.service.enabled', 'true') ,('spark.dynamicAllocation.enabled', 'true') ]) py_files = ['hdfs://emr-header-1.cluster-68492:9000/lib/py4j-0.10.7-src.zip'] sc = pyspark.SparkContext(appName="Jianshi", master="yarn-client", conf=sparkConf, pyFiles=py_files) Thanks, -- Jianshi Huang -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Re: Local vs Cluster
Hi Aakash, in the cluster you need to consider the total number of executors you are using. Please take a look in the following link for an introduction. https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html regards, Apostolos On 14/09/2018 11:21 πμ, Aakash Basu wrote: Hi, What is the Spark cluster equivalent of standalone's local[N]. I mean, the value we set as a parameter of local as N, which parameter takes it in the cluster mode? Thanks, Aakash. -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark job's driver programe consums too much memory
You are putting all together and this does not make sense. Writing data to HDFS does not require that all data should be transfered back to the driver and THEN saved to HDFS. This would be a disaster and it would never scale. I suggest to check the documentation more carefully because I believe you are a bit confused. regards, Apostolos On 07/09/2018 05:39 μμ, James Starks wrote: Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc shows that my spark doesn't use those actions functions. But save functions looks resembling the function df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my spark job uses. Therefore I am thinking maybe that's the reason why my spark job driver consumes such amount of memory. https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions My spark job's driver program consumes too much memory, so I want to prevent that by writing data to hdfs at the executor side, instead of waiting those data to be sent back to the driver program (then writing to hdfs). This is because our worker servers have bigger memory size than the one that runs driver program. If I can write data to hdfs at executor, then the driver memory for my spark job can be reduced. Otherwise does Spark support streaming read from database (i.e. spark streaming + spark sql)? Thanks for your reply. ‐‐‐ Original Message ‐‐‐ On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos wrote: Dear James, - check the Spark documentation to see the actions that return a lot of data back to the driver. One of these actions is collect(). However, take(x) is an action, also reduce() is an action. Before executing collect() find out what is the size of your RDD/DF. - I cannot understand the phrase "hdfs directly from the executor". You can specify an hdfs file as your input and also you can use hdfs to store your output. regards, Apostolos On 07/09/2018 05:04 μμ, James Starks wrote: I have a Spark job that read data from database. By increasing submit parameter '--driver-memory 25g' the job can works without a problem locally but not in prod env because prod master do not have enough capacity. So I have a few questions: - What functions such as collecct() would cause the data to be sent back to the driver program? My job so far merely uses `as`, `filter`, `map`, and `filter`. - Is it possible to write data (in parquet format for instance) to hdfs directly from the executor? If so how can I do (any code snippet, doc for reference, or what keyword to search cause can't find by e.g. `spark direct executor hdfs write`)? Thanks -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol --- To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark job's driver programe consums too much memory
Dear James, - check the Spark documentation to see the actions that return a lot of data back to the driver. One of these actions is collect(). However, take(x) is an action, also reduce() is an action. Before executing collect() find out what is the size of your RDD/DF. - I cannot understand the phrase "hdfs directly from the executor". You can specify an hdfs file as your input and also you can use hdfs to store your output. regards, Apostolos On 07/09/2018 05:04 μμ, James Starks wrote: I have a Spark job that read data from database. By increasing submit parameter '--driver-memory 25g' the job can works without a problem locally but not in prod env because prod master do not have enough capacity. So I have a few questions: - What functions such as collecct() would cause the data to be sent back to the driver program? My job so far merely uses `as`, `filter`, `map`, and `filter`. - Is it possible to write data (in parquet format for instance) to hdfs directly from the executor? If so how can I do (any code snippet, doc for reference, or what keyword to search cause can't find by e.g. `spark direct executor hdfs write`)? Thanks -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Error in show()
Can you isolate the row that is causing the problem? I mean start using show(31) up to show(60). Perhaps this will help you to understand the problem. regards, Apostolos On 07/09/2018 01:11 πμ, dimitris plakas wrote: Hello everyone, I am new in Pyspark and i am facing an issue. Let me explain what exactly is the problem. I have a dataframe and i apply on this a map() function (dataframe2=datframe1.rdd.map(custom_function()) dataframe = sqlContext.createDataframe(dataframe2) when i have dataframe.show(30,True) it shows the result, when i am using dataframe.show(60, True) i get the error. The Error is in the attachement Pyspark_Error.txt. Could you please explain me what is this error and how to overpass it? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://datalab.csd.auth.gr/~apostol
Re: Parallelism: behavioural difference in version 1.2 and 2.1!?
Dear Jeevan, Spark 1.2 is quite old, and If I were you I would go for a newer version. However, is there a parallelism level (e.g., 20, 30) that works for both installations? regards, Apostolos On 29/08/2018 04:55 μμ, jeevan.ks wrote: Hi, I've two systems. One is built on Spark 1.2 and the other on 2.1. I am benchmarking both with the same benchmarks (wordcount, grep, sort, etc.) with the same data set from S3 bucket (size ranges from 50MB to 10 GB). The Spark cluster I made use of is r3.xlarge, 8 instances, 4 cores each, and 28GB RAM. I observed a strange behaviour while running the benchmarks and is as follows: - When I ran Spark 1.2 version with default partition number (sc.defaultParallelism), the jobs would take forever to complete. So I changed it to the number of cores, i.e., 32 times 3 = 96. This did a magic and the jobs completed quickly. - However, when I tried the above magic number on the version 2.1, the jobs are taking forever. Deafult parallelism works better, but not that efficient. I'm having problem to rationalise this and compare both the systems. My question is: what changes were made from 1.2 to 2.1 with respect to default parallelism for this behaviour to occur? How can I have both versions behave similary on the same software/hardware configuration so that I can compare? I'd really appreciate your help on this! Cheers, Jeevan -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://delab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Create an Empty dataframe
Hi Dimitri, you can do the following: 1. create an initial dataframe from an empty csv 2. use "union" to insert new rows Do not forget that Spark cannot replace a DBMS. Spark is mainly be used for analytics. If you need select/insert/delete/update capabilities, perhaps you should look at a DBMS. Another alternative, in case you need "append only" semantics, is to use streaming or structured streaming. regards, Apostolos On 30/06/2018 05:46 μμ, dimitris plakas wrote: I am new to Pyspark and want to initialize a new empty dataframe with sqlContext() with two columns ("Column1", "Column2"), and i want to append rows dynamically in a for loop. Is there any way to achieve this? Thank you in advance. -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://delab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Increase no of tasks
How many partitions do you have in your data? On 22/06/2018 09:46 μμ, pratik4891 wrote: Hi Gurus, I am running a spark job and in one stage it's creating 9 tasks .So even if I have 25 executors only 9s are getting utilized. The other executors going to dead status , how can I increase the no of tasks so all my executors can be utilized.Any help/guidance is appreciated :) <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8535/s1.jpg> <http://apache-spark-user-list.1001560.n3.nabble.com/file/t8535/s2.jpg> -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org -- Apostolos N. Papadopoulos, Associate Professor Department of Informatics Aristotle University of Thessaloniki Thessaloniki, GREECE tel: ++0030312310991918 email: papad...@csd.auth.gr twitter: @papadopoulos_ap web: http://delab.csd.auth.gr/~apostol - To unsubscribe e-mail: user-unsubscr...@spark.apache.org