Re: Re-create SparkContext of SparkSession inside long-lived Spark app

2024-02-17 Thread Jörn Franke
You can try to shuffle to s3 using the cloud shuffle plugin for s3 
(https://aws.amazon.com/blogs/big-data/introducing-the-cloud-shuffle-storage-plugin-for-apache-spark/)
 - the performance of the new plugin is for many spark jobs sufficient (it 
works also on EMR). Then you can use s3 lifecycle policies to clean up/expire 
objects older than one day 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html)
 - this then also cleans up files from crashed spark jobs.

For shuffle on disk you have not much choices as you mentioned. I would though 
avoid to have a long living app that loops - that never works so well on Spark 
(it is designed for batch jobs that eventually stop). Maybe you can simply 
trigger a new job when a new file arrives (s3 events ?).

> Am 18.02.2024 um 00:39 schrieb Saha, Daniel :
> 
> 
> Hi,
>  
> Background: I am running into executor disk space issues when running a 
> long-lived Spark 3.3 app with YARN on AWS EMR. The app performs back-to-back 
> spark jobs in a sequential loop with each iteration performing 100gb+ 
> shuffles. The files taking up the space are related to shuffle blocks [1]. 
> Disk is only cleared when restarting the YARN app. For all intents and 
> purposes, each job is independent. So once a job/iterator is complete, there 
> is no need to retain these shuffle files. I want to try stopping and 
> recreating the Spark context between loop iterations/jobs to indicate to 
> Spark DiskBlockManager that these intermediate results are no longer needed 
> [2].
>  
> Questions:
> Are there better ways to remove/clean the directory containing these old, no 
> longer used, shuffle results (aside from cron or restarting yarn app)?
> How to recreate the spark context within a single application? I see no 
> methods in Spark Session for doing this, and each new Spark session re-uses 
> the existing spark context. After stopping the SparkContext, SparkSession 
> does not re-create a new one. Further, creating a new SparkSession via 
> constructor and passing in a new SparkContext is not allowed as it is a 
> protected/private method.
>  
> Thanks
> Daniel
>  
> [1] 
> /mnt/yarn/usercache/hadoop/appcache/application_1706835946137_0110/blockmgr-eda47882-56d6-4248-8e30-a959ddb912c5
> [2] https://stackoverflow.com/a/38791921


Re: Cluster-mode job compute-time/cost metrics

2023-12-12 Thread Jörn Franke
It could be simpler and faster to use tagging of resources for billing:

https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-tags-billing.html

That could also include other resources (eg s3).

> Am 12.12.2023 um 04:47 schrieb Jack Wells :
> 
> 
> Hello Spark experts - I’m running Spark jobs in cluster mode using a 
> dedicated cluster for each job. Is there a way to see how much compute time 
> each job takes via Spark APIs, metrics, etc.? In case it makes a difference, 
> I’m using AWS EMR - I’d ultimately like to be able to say this job costs $X 
> since it took Y minutes on Z instance types (assuming all of the nodes are 
> the same instance type), but I figure I could probably need to get the Z 
> instance type through EMR APIs.
> 
> Thanks!
> Jack
> 


Re: Spark on Java 17

2023-12-09 Thread Jörn Franke
It is just a goal… however I would not tune the no of regions or region size yet.Simply specify gc algorithm and max heap size.Try to tune other options only if there is a need, only one at at time (otherwise it is difficult to determine cause/effects) and have a performance testing framework in place to be able to measure differences.Do you need those large heaps in Spark? Why not split the tasks further to have more tasks with less memory ?I understand that each job is different and there can be reasons for it, but I often try to just use the defaults and then tune individual options. I try to also avoid certain extreme values (of course there are cases when they are needed). Especially often when upgrading from one Spark version to another then I find out it is then often better to work with a Spark job with default settings, because Spark itself has improved/changed how it works.To reduce the needed heap you can try to increase the number of tasks ( see here https://spark.apache.org/docs/latest/configuration.html)spark.executor.cores (to a few) and spark.sql.shuffle.partitions (default is 200 - you can try how much it brings to change it to 400 etc).and reducespark.executor.memoryAm 10.12.2023 um 02:33 schrieb Faiz Halde :Thanks, IL check them outCurious though, the official G1GC page https://www.oracle.com/technical-resources/articles/java/g1gc.html says that there must be no more than 2048 regions and region size is limited upto 32mbThat's strange because our heaps go up to 100gb and that would require 64mb region size to be under 2048ThanksFaizOn Sat, Dec 9, 2023, 10:33 Luca Canali  wrote:







Hi Faiz,
 
We find G1GC works well for some of our workloads that are Parquet-read intensive and we have been using G1GC with Spark on Java 8 already (spark.driver.extraJavaOptions and spark.executor.extraJavaOptions= “-XX:+UseG1GC”),
 while currently we are mostly running Spark (3.3 and higher) on Java 11.  
However, the best is always to refer to measurements of your specific workloads, let me know if you find something different. 

BTW besides the WebUI, I typically measure GC time also with a couple of custom tools:
https://github.com/cerndb/spark-dashboard and  https://github.com/LucaCanali/sparkMeasure 

A few tests of microbenchmarking Spark reading Parquet with a few different JDKs at:
https://db-blog.web.cern.ch/node/192 

 
Best,
Luca
 
 

From: Faiz Halde  
Sent: Thursday, December 7, 2023 23:25
To: user@spark.apache.org
Subject: Spark on Java 17

 

Hello,

 


We are planning to switch to Java 17 for Spark and were wondering if there's any obvious learnings from anybody related to JVM tuning?


 


We've been running on Java 8 for a while now and used to use the parallel GC as that used to be a general recommendation for high throughout systems. How has the default G1GC worked out with Spark?


 


Thanks


Faiz








Re: Spark on Java 17

2023-12-09 Thread Jörn Franke
If you do tests with newer Java versions you can also try:

- UseNUMA: -XX:+UseNUMA. See https://openjdk.org/jeps/345

You can also assess the new Java GC algorithms:
- -XX:+UseShenandoahGC - works with terabyte of heaps - more memory efficient 
than zgc with heaps <32 GB. See also: 
https://developers.redhat.com/articles/2021/09/16/shenandoah-openjdk-17-sub-millisecond-gc-pauses
-  -XX:+UseZGC - works also with terabytes of heaps - see also 
https://www.baeldung.com/jvm-zgc-garbage-collector

Note: in jdk 21 zgc has an additional option that could make sense to activate:

-XX:+ZGenerational

See also 
https://developers.redhat.com/articles/2021/11/02/how-choose-best-java-garbage-collector

Note: it might be worth to try also JDK 21 - it has for certain GCs 
optimizations (amongst other things - I wonder how much improvement virtual 
threads can bring to Spark)

> Am 08.12.2023 um 01:02 schrieb Faiz Halde :
> 
> 
> Hello,
> 
> We are planning to switch to Java 17 for Spark and were wondering if there's 
> any obvious learnings from anybody related to JVM tuning?
> 
> We've been running on Java 8 for a while now and used to use the parallel GC 
> as that used to be a general recommendation for high throughout systems. How 
> has the default G1GC worked out with Spark?
> 
> Thanks
> Faiz


Re: Spark-submit without access to HDFS

2023-11-16 Thread Jörn Franke
I am not 100% sure but I do not think this works - the driver would need access to HDFS.What you could try (have not tested it though in your scenario):- use SparkConnect: https://spark.apache.org/docs/latest/spark-connect-overview.html- host the zip file on a https server and use that url (I would recommend against it though for various reasons, such as reliability)Am 15.11.2023 um 22:33 schrieb Eugene Miretsky :Hey All, We are running Pyspark spark-submit from a client outside the cluster. The client has network connectivity only to the Yarn Master, not the HDFS Datanodes. How can we submit the jobs? The idea would be to preload all the dependencies (job code, libraries, etc) to HDFS, and just submit the job from the client. We tried something like this'PYSPARK_ARCHIVES_PATH=hdfs://some-path/pyspark.zip spark-submit --master yarn --deploy-mode cluster --py-files hdfs://yarn-master-url hdfs://foo.py'The error we are getting is "org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.117.110.19:9866]org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/users/.sparkStaging/application_1698216436656_0104/spark_conf.zip could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation." A few question 1) What are the spark_conf.zip files. Is it the hive-site/yarn-site conf files? Why would the client send them to the cluster? (the cluster already has all that info - this would make sense in client mode, but not cluster mode )2) Is it possible to use spark-submit without HDFS access? 3) How would we fix this?  Cheers,Eugene-- Eugene MiretskyManaging Partner |  Badal.io | Book a meeting /w me! mobile:  416-568-9245email:     eug...@badal.io


Re: automatically/dinamically renew aws temporary token

2023-10-23 Thread Jörn Franke
Can’t you attach the cross account permission to the glue job role? Why the 
detour via AssumeRole ?

Assumerole can make sense if you use an AWS IAM user and STS authentication, 
but this would make no sense within AWS for cross-account access as attaching 
the permissions to the Glue job role is more secure (no need for static 
credentials, automatically renew permissions in shorter time without any 
specific configuration in Spark).

Have you checked with AWS support?

> Am 22.10.2023 um 21:14 schrieb Carlos Aguni :
> 
> 
> hi all,
> 
> i've a scenario where I need to assume a cross account role to have S3 bucket 
> access.
> 
> the problem is that this role only allows for 1h time span (no negotiation).
> 
> that said.
> does anyone know a way to tell spark to automatically renew the token
> or to dinamically renew the token on each node?
> i'm currently using spark on AWS glue.
> 
> wonder what options do I have.
> 
> regards,c.


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
There is nowadays more a trend to move away from static credentials/certificates that are stored in a secret vault. The issue is that the rotation of them is complex, once they are leaked they can be abused, making minimal permissions feasible is cumbersome etc. That is why keyless approaches are used for A2A access (workload identity federation was mentioned). E.g. in AWS EKS you would build this on oidc (https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.html) and configure this instead of using secrets. Similar approaches exist in other clouds and even on-premise (eg SPIFFE https://spiffe.io/).If this will become the standard will be difficult to say - for sure they seem to more easier to manage.Since you seem to have a Kubernetes setup which means per cloud/data Centre a lot of extra work, infrastructure cost and security issues, workload Identity federation may ease this compared to a secret store.Am 01.10.2023 um 08:27 schrieb Jon Rodríguez Aranguren :Dear Jörn Franke, Jayabindu Singh and Spark Community members,Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding.We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift.Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically.The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context.Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance.Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor.Warm regards,Jon Rodríguez Aranguren.El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh (<jayabi...@gmail.com>) escribió:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke <jornfra...@gmail.com> wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <jon.r.arangu...@gmail.com>:Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
With oidc sth comparable is possible: https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.htmlAm 01.10.2023 um 11:13 schrieb Mich Talebzadeh :It seems that workload identity is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity.Both Google Cloud Kubernetes (GKE) and Azure Kubernetes Service support Workload Identity. Taking notes from Google Cloud:  "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way."HTH

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom

   view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh <jayabi...@gmail.com> wrote:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke <jornfra...@gmail.com> wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <jon.r.arangu...@gmail.com>:Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jörn Franke
Don’t use static iam (s3) credentials. It is an outdated insecure method - even 
AWS recommend against using this for anything (cf eg 
https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).
It is almost a guarantee to get your data stolen and your account manipulated. 

If you need to use kubernetes (which has its own very problematic security 
issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS 
it means using OIDC, cf 
https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).

> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren 
> :
> 
> 
> Dear Spark Community Members,
> 
> I trust this message finds you all in good health and spirits.
> 
> I'm reaching out to the collective expertise of this esteemed community with 
> a query regarding Spark on Kubernetes. As a newcomer, I have always admired 
> the depth and breadth of knowledge shared within this forum, and it is my 
> hope that some of you might have insights on a specific challenge I'm facing.
> 
> I am currently trying to configure multiple Kubernetes secrets, notably 
> multiple S3 keys, at the SparkConf level for a Spark application. My 
> objective is to understand the best approach or methods to ensure that these 
> secrets can be smoothly accessed by the Spark application.
> 
> If any of you have previously encountered this scenario or possess relevant 
> insights on the matter, your guidance would be highly beneficial.
> 
> Thank you for your time and consideration. I'm eager to learn from the 
> experiences and knowledge present within this community.
> 
> Warm regards,
> Jon


Re: Log4j2 upgrade

2022-01-12 Thread Jörn Franke
You cannot simply replace it - log4j2 has a slightly different API than log4j. 
The Spark source code needs to be changed in a couple of places 

> Am 12.01.2022 um 20:53 schrieb Amit Sharma :
> 
> 
> Hello, everyone. I am replacing log4j with log4j2 in my spark streaming 
> application. When i deployed my application to spark cluster it is giving me 
> the below error .
> 
> " ERROR StatusLogger Log4j2 could not find a logging implementation. Please 
> add log4j-core to the classpath. Using SimpleLogger to log to the console "
> 
> 
> I am including the core jar in my fat jar and core jar also included in the 
> jar. Although the application is running fine, I am doubtful the logs are 
> generated using log4j not log4j2 .
> I am using sbt assembly jar and also noticed below  messages in the build
> 
> Fully-qualified classname does not match jar entry:
>   jar entry: META-INF/versions/9/module-info.class
> 
> 
>   class name: module-info.class
> Omitting META-INF/versions/9/module-info.class.
> Fully-qualified classname does not match jar entry:
>   jar entry: 
> META-INF/versions/9/org/apache/logging/log4j/util/Base64Util.class
>   class name: org/apache/logging/log4j/util/Base64Util.class
> Omitting META-INF/versions/9/org/apache/logging/log4j/util/Base64Util.class.
> Fully-qualified classname does not match jar entry:
>   jar entry: 
> META-INF/versions/9/org/apache/logging/log4j/util/internal/DefaultObjectInputFilter.class
> 
> 
> any idea how to resolve these.
> 
> 
> Thanks
> Amit

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: hive table with large column data size

2022-01-09 Thread Jörn Franke
It is not a good practice to do this. Just store a reference to the binary data 
stored on HDFS.

> Am 09.01.2022 um 15:34 schrieb weoccc :
> 
> 
> Hi ,
> 
> I want to store binary data (such as images) into hive table but the binary 
> data column might be much larger than other columns per row.  I'm worried 
> about the query performance. One way I can think of is to separate binary 
> data storage from other columns by creating 2 hive tables and run 2 separate 
> spark query and join them later. 
> 
> Later, I found parquet has supported column split into different files as 
> shown here: 
> https://parquet.apache.org/documentation/latest/
> 
> I'm wondering if spark sql already supports that ? If so, how to use ? 
> 
> Weide 


Re: Log4j 1.2.17 spark CVE

2021-12-13 Thread Jörn Franke
Is it in any case appropriate to use log4j 1.x which is not maintained anymore 
and has other security vulnerabilities which won’t be fixed anymore ?

> Am 13.12.2021 um 06:06 schrieb Sean Owen :
> 
> 
> Check the CVE - the log4j vulnerability appears to affect log4j 2, not 1.x. 
> There was mention that it could affect 1.x when used with JNDI or SMS 
> handlers, but Spark does neither. (unless anyone can think of something I'm 
> missing, but never heard or seen that come up at all in 7 years in Spark)
> 
> The big issue would be applications that themselves configure log4j 2.x, but 
> that's not a Spark issue per se.
> 
>> On Sun, Dec 12, 2021 at 10:46 PM Pralabh Kumar  
>> wrote:
>> Hi developers,  users 
>> 
>> Spark is built using log4j 1.2.17 . Is there a plan to upgrade based on 
>> recent CVE detected ?
>> 
>> 
>> Regards
>> Pralabh kumar


Re: Naming files while saving a Dataframe

2021-07-18 Thread Jörn Franke
Spark heavily depends on Hadoop writing files. You can try to set the Hadoop 
property: mapreduce.output.basename 

https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html#hadoopConfiguration--


> Am 18.07.2021 um 01:15 schrieb Eric Beabes :
> 
> 
> Mich - You're suggesting changing the "Path". Problem is that, we've an 
> EXTERNAL table created on top of this path so "Path" CANNOT change. If we 
> could, it would be easy to solve this problem. My question is about changing 
> the "Filename".
> 
> As Ayan pointed out, Spark doesn't seem to allow "prefixes" for the filenames!
> 
>> On Sat, Jul 17, 2021 at 1:58 PM Mich Talebzadeh  
>> wrote:
>> Using this
>> 
>> df.write.mode("overwrite").format("parquet").saveAsTable("test.ABCD")
>> 
>> That will create a parquet table in the database test. which is essentially 
>> a hive partition in the format
>> 
>> /user/hive/warehouse/test.db/abcd/00_0
>> 
>> 
>>view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Sat, 17 Jul 2021 at 20:45, Eric Beabes  wrote:
>>> I am not sure if you've understood the question. Here's how we're saving 
>>> the DataFrame:
>>> 
>>> df
>>>   .coalesce(numFiles)
>>>   .write
>>>   .partitionBy(partitionDate)
>>>   .mode("overwrite")
>>>   .format("parquet")
>>>   .save(someDirectory)
>>> 
>>> Now where would I add a 'prefix' in this one?
>>> 
 On Sat, Jul 17, 2021 at 10:54 AM Mich Talebzadeh 
  wrote:
 try it see if it works
 
 fullyQualifiedTableName = appName+'_'+tableName
 
 
 
view my Linkedin profile
 
  
 Disclaimer: Use it at your own risk. Any and all responsibility for any 
 loss, damage or destruction of data or any other property which may arise 
 from relying on this email's technical content is explicitly disclaimed. 
 The author will in no case be liable for any monetary damages arising from 
 such loss, damage or destruction.
  
 
 
> On Sat, 17 Jul 2021 at 18:02, Eric Beabes  
> wrote:
> I don't think Spark allows adding a 'prefix' to the file name, does it? 
> If it does, please tell me how. Thanks.
> 
>> On Sat, Jul 17, 2021 at 9:47 AM Mich Talebzadeh 
>>  wrote:
>> Jobs have names in spark. You can prefix it to the file name when 
>> writing to directory I guess
>> 
>>  val sparkConf = new SparkConf().
>>setAppName(sparkAppName).
>>  
>> 
>> 
>>view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may 
>> arise from relying on this email's technical content is explicitly 
>> disclaimed. The author will in no case be liable for any monetary 
>> damages arising from such loss, damage or destruction.
>>  
>> 
>> 
>>> On Sat, 17 Jul 2021 at 17:40, Eric Beabes  
>>> wrote:
>>> Reason we've two jobs writing to the same directory is that the data is 
>>> partitioned by 'day' (mmdd) but the job runs hourly. Maybe the only 
>>> way to do this is to create an hourly partition (/mmdd/hh). Is that 
>>> the only way to solve this?
>>> 
 On Fri, Jul 16, 2021 at 5:45 PM ayan guha  wrote:
 IMHO - this is a bad idea esp in failure scenarios. 
 
 How about creating a subfolder each for the jobs? 
 
> On Sat, 17 Jul 2021 at 9:11 am, Eric Beabes 
>  wrote:
> We've two (or more) jobs that write data into the same directory via 
> a Dataframe.save method. We need to be able to figure out which job 
> wrote which file. Maybe provide a 'prefix' to the file names. I was 
> wondering if there's any 'option' that allows us to do this. Googling 
> didn't come up with any solution so thought of asking the Spark 
> experts on this mailing list.
> 
> Thanks in advance.
 -- 
 Best Regards,
 Ayan Guha


Re: Scala vs Python for ETL with Spark

2020-10-10 Thread Jörn Franke
It really depends on what your data scientists talk. I don’t think it makes 
sense for ad hoc data science things to impose a language on them, but let them 
choose.
For more complex AI engineering things you can though apply different standards 
and criteria. And then it really depends on architecture aspects etc.

> Am 09.10.2020 um 22:57 schrieb Mich Talebzadeh :
> 
> 
> I have come across occasions when the teams use Python with Spark for ETL, 
> for example processing data from S3 buckets into Snowflake with Spark.
> 
> The only reason I think they are choosing Python as opposed to Scala is 
> because they are more familiar with Python. Since Spark is written in Scala, 
> itself is an indication of why I think Scala has an edge.
> 
> I have not done one to one comparison of Spark with Scala vs Spark with 
> Python. I understand for data science purposes most libraries like TensorFlow 
> etc. are written in Python but I am at loss to understand the validity of 
> using Python with Spark for ETL purposes.
> 
> These are my understanding but they are not facts so I would like to get some 
> informed views on this if I can?
> 
> Many thanks,
> 
> Mich
> 
> 
> 
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Re: Merging Parquet Files

2020-08-31 Thread Jörn Franke
Why only one file?
I would go more for files of specific size, eg data is split in 1gb files. The 
reason is also that if you need to transfer it (eg to other clouds etc) - 
having a large file of several terabytes is bad.

It depends on your use case but you might look also at partitions etc.

> Am 31.08.2020 um 16:17 schrieb Tzahi File :
> 
> 
> Hi, 
> 
> I would like to develop a process that merges parquet files. 
> My first intention was to develop it with PySpark using coalesce(1) -  to 
> create only 1 file. 
> This process is going to run on a huge amount of files.
> I wanted your advice on what is the best way to implement it (PySpark isn't a 
> must).  
> 
> 
> Thanks,
> Tzahi


Re: Connecting to Oracle Autonomous Data warehouse (ADW) from Spark via JDBC

2020-08-26 Thread Jörn Franke
Is the directory available on all nodes ?

> Am 26.08.2020 um 22:08 schrieb kuassi.men...@oracle.com:
> 
> 
> Mich,
> 
> All looks fine.
> Perhaps some special chars in username or password?
> 
>> it is recommended not to use such characters like '@', '.' in your password.
> Best, Kuassi
> On 8/26/20 12:52 PM, Mich Talebzadeh wrote:
>> Thanks Kuassi.
>> 
>> This is the version of jar file that work OK with JDBC connection via JAVA 
>> to ADW
>> 
>> unzip -p ojdbc8.jar META-INF/MANIFEST.MF
>> Manifest-Version: 1.0
>> Implementation-Title: JDBC
>> Implementation-Version: 18.3.0.0.0
>> sealed: true
>> Specification-Vendor: Sun Microsystems Inc.
>> Specification-Title: JDBC
>> Class-Path: oraclepki.jar
>> Implementation-Vendor: Oracle Corporation
>> Main-Class: oracle.jdbc.OracleDriver
>> Ant-Version: Apache Ant 1.7.1
>> Repository-Id: JAVAVM_18.1.0.0.0_LINUX.X64_180620
>> Created-By: 25.171-b11 (Oracle Corporation)
>> Specification-Version: 4.0
>> 
>> And this the setting for TNS_ADMIN
>> 
>> echo ${TNS_ADMIN}
>> /home/hduser/dba/bin/ADW/DBAccess
>> 
>> hduser@rhes76: /home/hduser/dba/bin/ADW/DBAccess> cat ojdbc.properties
>> # Connection property while using Oracle wallets.
>> oracle.net.wallet_location=(SOURCE=(METHOD=FILE)(METHOD_DATA=(DIRECTORY=${TNS_ADMIN})))
>> # FOLLOW THESE STEPS FOR USING JKS
>> # (1) Uncomment the following properties to use JKS.
>> # (2) Comment out the oracle.net.wallet_location property above
>> # (3) Set the correct password for both trustStorePassword and 
>> keyStorePassword.
>> # It's the password you specified when downloading the wallet from OCI 
>> Console or the Service Console.
>> #javax.net.ssl.trustStore=${TNS_ADMIN}/truststore.jks
>> #javax.net.ssl.trustStorePassword=
>> #javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks
>> #javax.net.ssl.keyStorePassword=hduser@rhes76: 
>> /home/hduser/dba/bin/ADW/DBAccess>
>> 
>> Regards,
>> 
>> Mich
>> 
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> 
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>> On Wed, 26 Aug 2020 at 20:16,  wrote:
>>> Hi,
>>> 
>>> From which release is the ojdbc8.jar from? 12c, 18c or 19c? I'd recommend 
>>> ojdbc8.jar from the latest release.
>>> One more thing to pay attention to is the content of the ojdbc.properties 
>>> file (part of the unzipped wallet)
>>> Make sure that ojdbc.properties file has been configured to use Oracle 
>>> Wallet, as follows (i.e., anything related to JKS commented out)
>>> 
>>> oracle.net.wallet_location=(SOURCE=(METHOD=FILE)(METHOD_DATA=(DIRECTORY=${TNS_ADMIN})))
>>> #javax.net.ssl.trustStore=${TNS_ADMIN}/truststore.jks
>>> #javax.net.ssl.trustStorePassword=
>>> #javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks
>>> #javax.net.ssl.keyStorePassword=
>>> 
>>> Alternatively, if you want to use JKS< then you need to comment out the 
>>> firts line and un-comment the other lines and set the values.
>>> 
>>> Kuassi
>>> 
>>> On 8/26/20 11:58 AM, Mich Talebzadeh wrote:
 Hi,
 
 The connection from Spark to Oracle 12c etc are well established using 
 ojdb6.jar.
 
 I am attempting to connect to Oracle Autonomous Data warehouse (ADW) 
 version 
 
 Oracle Database 19c Enterprise Edition Release 19.0.0.0.0
 
 Oracle document suggest using ojdbc8.jar to connect to the database with 
 the following URL format using Oracle Wallet
 
 "jdbc:oracle:thin:@mydb_high?TNS_ADMIN=/home/hduser/dba/bin/ADW/DBAccess"
 
 This works fine through JAVA itself but throws an error with Spark version 
 2.4.3.
 
 The connection string is defined as follows
 
 val url = 
 "jdbc:oracle:thin:@mydb_high?TNS_ADMIN=/home/hduser/dba/bin/ADW/DBAccess"
 
 where DBAcess directory is the unzipped wallet for Wallet_mydb.zip as 
 created by ADW connection.
 
 The thing is that this works through normal connection via java code.using 
 the same URL
 
 So the question is whether there is a dependency in Spark JDBC connection 
 to the ojdbc.
 
 The error I am getting is:
 
 java.sql.SQLRecoverableException: IO Error: Invalid connection string 
 format, a valid format is: "host:port:sid"
 at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:489)
 at 
 oracle.jdbc.driver.PhysicalConnection.(PhysicalConnection.java:553)
 at oracle.jdbc.driver.T4CConnection.(T4CConnection.java:254)
 at 
 oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:32)
 at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:528)
 at 

Re: Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

2020-07-18 Thread Jörn Franke
It depends a  bit on the data as well, but have you investigated in SparkUI 
which executor/task becomes slowly?

Could it be also the database from which you load data?

> Am 18.07.2020 um 17:00 schrieb Yong Yuan :
> 
> 
> The spark job has the correct functions and logic. However, after several 
> hours running, it becomes slower and slower. Are there some pitfalls in the 
> below code? Thanks!
> 
> 
> val query = "(select * from meta_table) as meta_data"
> val meta_schema = new StructType() 
>.add("config_id", BooleanType) 
>.add("threshold", LongType) 
> var meta_df = spark.read.jdbc(url, query, connectionProperties) 
> var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), 
> meta_schema).as("config")).select("config_id", "thresold", "config.*")  
> 
> //rules_imsi_df: joining of kafka ingestion with the meta_df_explode 
> 
> //rules_monitoring_df: static dataframe for monitoring purpose   
> 
> val rules_monitoring_stream =
> rules_imsi_df.writeStream   
> .outputMode("append")  
>   .format("memory")
> .trigger(Trigger.ProcessingTime("120  seconds"))
>  .foreachBatch {  
>   (batchDF: DataFrame, batchId: Long) =>
> if(!batchDF.isEmpty)  
>{
> 
> printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, 
> batchDF.count()) 
> batchDF.show()
>  batchDF.persist()
> var batchDF_group = 
> batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", 
> "total_volume_id")  
> rules_monitoring_df = rules_monitoring_df.join(batchDF_group, 
> rules_monitoring_df("id") === batchDF_group("id"), 
> "left").select(rules_monitoring_df("id"), 
> batchDF_group("total_volume_id")).na.fill(0) 
> rules_monitoring_df = rules_monitoring_df.withColumn("volume", 
> rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) 
>   batchDF.unpersist() 
>  }
>   }.start()
> 
> 
>   while(rules_monitoring_stream.isActive){  
> Thread.sleep(24)  
> ... //Periodically load meta data from database  
> meta_df = spark.read.jdbc(url, query, connectionProperties)  
> meta_df_explode=meta_df.select(col("id"), from_json(col("config"), 
> meta_schema).as("config")).select("config_id", "thresold", "config.*")
>  
> } 
> 
> 
> 
> 
> In addition to the code, the yarn-sites.xml is configured as below. 
> 
> yarn.nodemanager.pmem-check-enabled, false
> yarn.nodemanager.localizer.cache.target-size-mb, 5120
> yarn.nodemanager.localizer.cache.cleanup.interval-ms, 40
> yarn.nodemanager.vmem-check-enabled, false
> yarn.nodemanager.disk-health-checker.enable,true
> yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
> yarn.log-aggregation.retain-seconds,36000
> 
> 
> 
> The spark-submit command is as below. 
> 
> spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G 
> --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab 
> --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf 
> -Djava.security.krb5.conf=./krb5.conf" --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
>  -Djava.security.krb5.conf=./krb5.conf" --conf 
> "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
>  -Djava.security.krb5.conf=./krb5.conf"  --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 
> sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
> 
> 
> I am running the job in AWS EMR with 2 m4.xlarge. 
> 
> Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Mocking pyspark read writes

2020-07-07 Thread Jörn Franke
Write to a local temp directory via file:// ?

> Am 07.07.2020 um 20:07 schrieb Dark Crusader :
> 
> 
> Hi everyone,
> 
> I have a function which reads and writes a parquet file from HDFS. When I'm 
> writing a unit test for this function, I want to mock this read & write.
> 
> How do you achieve this? 
> Any help would be appreciated. Thank you.
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Getting PySpark Partitions Locations

2020-06-25 Thread Jörn Franke
By doing a select on the df ?

> Am 25.06.2020 um 14:52 schrieb Tzahi File :
> 
> 
> Hi,
> 
> I'm using pyspark to write df to s3, using the following command: 
> "df.write.partitionBy("day","hour","country").mode("overwrite").parquet(s3_output)".
> 
> Is there any way to get the partitions created?
> e.g. 
> day=2020-06-20/hour=1/country=US
> day=2020-06-20/hour=2/country=US
> ..
> 
> -- 
> Tzahi File
> Data Engineer
> 
> email tzahi.f...@ironsrc.com
> mobile +972-546864835
> fax +972-77-5448273
> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
> ironsrc.com
> 
> This email (including any attachments) is for the sole use of the intended 
> recipient and may contain confidential information which may be protected by 
> legal privilege. If you are not the intended recipient, or the employee or 
> agent responsible for delivering it to the intended recipient, you are hereby 
> notified that any use, dissemination, distribution or copying of this 
> communication and/or its content is strictly prohibited. If you are not the 
> intended recipient, please immediately notify us by reply email or by 
> telephone, delete this email and destroy any copies. Thank you.


Re: Reading TB of JSON file

2020-06-19 Thread Jörn Franke
Make every json object a line and then read t as jsonline not as multiline 

> Am 19.06.2020 um 14:37 schrieb Chetan Khatri :
> 
> 
> All transactions in JSON, It is not a single array. 
> 
>> On Thu, Jun 18, 2020 at 12:55 PM Stephan Wehner  
>> wrote:
>> It's an interesting problem. What is the structure of the file? One big 
>> array? On hash with many key-value pairs?
>> 
>> Stephan
>> 
>>> On Thu, Jun 18, 2020 at 6:12 AM Chetan Khatri  
>>> wrote:
>>> Hi Spark Users,
>>> 
>>> I have a 50GB of JSON file, I would like to read and persist at HDFS so it 
>>> can be taken into next transformation. I am trying to read as 
>>> spark.read.json(path) but this is giving Out of memory error on driver. 
>>> Obviously, I can't afford having 50 GB on driver memory. In general, what 
>>> is the best practice to read large JSON file like 50 GB?
>>> 
>>> Thanks
>> 
>> 
>> -- 
>> Stephan Wehner, Ph.D.
>> The Buckmaster Institute, Inc.
>> 2150 Adanac Street
>> Vancouver BC V5L 2E7
>> Canada
>> Cell (604) 767-7415
>> Fax (888) 808-4655
>> 
>> Sign up for our free email course
>> http://buckmaster.ca/small_business_website_mistakes.html
>> 
>> http://www.buckmaster.ca
>> http://answer4img.com
>> http://loggingit.com
>> http://clocklist.com
>> http://stephansmap.org
>> http://benchology.com
>> http://www.trafficlife.com
>> http://stephan.sugarmotor.org (Personal Blog)
>> @stephanwehner (Personal Account)
>> VA7WSK (Personal call sign)


Re: Reading TB of JSON file

2020-06-18 Thread Jörn Franke
Depends on the data types you use.

Do you have in jsonlines format? Then the amount of memory plays much less a 
role.

Otherwise if it is one large object or array I would not recommend it.

> Am 18.06.2020 um 15:12 schrieb Chetan Khatri :
> 
> 
> Hi Spark Users,
> 
> I have a 50GB of JSON file, I would like to read and persist at HDFS so it 
> can be taken into next transformation. I am trying to read as 
> spark.read.json(path) but this is giving Out of memory error on driver. 
> Obviously, I can't afford having 50 GB on driver memory. In general, what is 
> the best practice to read large JSON file like 50 GB?
> 
> Thanks

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark dataframe hdfs vs s3

2020-05-29 Thread Jörn Franke
Maybe some aws network optimized instances with higher bandwidth will improve 
the situation.

> Am 27.05.2020 um 19:51 schrieb Dark Crusader :
> 
> 
> Hi Jörn,
> 
> Thanks for the reply. I will try to create a easier example to reproduce the 
> issue.
> 
> I will also try your suggestion to look into the UI. Can you guide on what I 
> should be looking for? 
> 
> I was already using the s3a protocol to compare the times.
> 
> My hunch is that multiple reads from S3 are required because of improper 
> caching of intermediate data. And maybe hdfs is doing a better job at this. 
> Does this make sense?
> 
> I would also like to add that we built an extra layer on S3 which might be 
> adding to even slower times.
> 
> Thanks for your help.
> 
>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke,  wrote:
>> Have you looked in Spark UI why this is the case ? 
>> S3 Reading can take more time - it depends also what s3 url you are using : 
>> s3a vs s3n vs S3.
>> 
>> It could help after some calculation to persist in-memory or on HDFS. You 
>> can also initially load from S3 and store on HDFS and work from there . 
>> 
>> HDFS offers Data locality for the tasks, ie the tasks start on the nodes 
>> where the data is. Depending on what s3 „protocol“ you are using you might 
>> be also more punished with performance.
>> 
>> Try s3a as a protocol (replace all s3n with s3a).
>> 
>> You can also use s3 url but this requires a special bucket configuration, a 
>> dedicated empty bucket and it lacks some ineroperability with other AWS 
>> services.
>> 
>> Nevertheless, it could be also something else with the code. Can you post an 
>> example reproducing the issue?
>> 
>> > Am 27.05.2020 um 18:18 schrieb Dark Crusader 
>> > :
>> > 
>> > 
>> > Hi all,
>> > 
>> > I am reading data from hdfs in the form of parquet files (around 3 GB) and 
>> > running an algorithm from the spark ml library.
>> > 
>> > If I create the same spark dataframe by reading data from S3, the same 
>> > algorithm takes considerably more time.
>> > 
>> > I don't understand why this is happening. Is this a chance occurence or 
>> > are the spark dataframes created different? 
>> > 
>> > I don't understand how the data store would effect the algorithm 
>> > performance.
>> > 
>> > Any help would be appreciated. Thanks a lot.


Re: Spark dataframe hdfs vs s3

2020-05-27 Thread Jörn Franke
Have you looked in Spark UI why this is the case ? 
S3 Reading can take more time - it depends also what s3 url you are using : s3a 
vs s3n vs S3.

It could help after some calculation to persist in-memory or on HDFS. You can 
also initially load from S3 and store on HDFS and work from there . 

HDFS offers Data locality for the tasks, ie the tasks start on the nodes where 
the data is. Depending on what s3 „protocol“ you are using you might be also 
more punished with performance.

Try s3a as a protocol (replace all s3n with s3a).

You can also use s3 url but this requires a special bucket configuration, a 
dedicated empty bucket and it lacks some ineroperability with other AWS 
services.

Nevertheless, it could be also something else with the code. Can you post an 
example reproducing the issue?

> Am 27.05.2020 um 18:18 schrieb Dark Crusader :
> 
> 
> Hi all,
> 
> I am reading data from hdfs in the form of parquet files (around 3 GB) and 
> running an algorithm from the spark ml library.
> 
> If I create the same spark dataframe by reading data from S3, the same 
> algorithm takes considerably more time.
> 
> I don't understand why this is happening. Is this a chance occurence or are 
> the spark dataframes created different? 
> 
> I don't understand how the data store would effect the algorithm performance.
> 
> Any help would be appreciated. Thanks a lot.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-23 Thread Jörn Franke
Yes I fear you have to shade and create an uberjar 

> Am 17.02.2020 um 23:27 schrieb Mich Talebzadeh :
> 
> 
> I stripped everything from the jar list. This is all I have
> 
> sspark-shell --jars shc-core-1.1.1-2.1-s_2.11.jar, \
>   json4s-native_2.11-3.5.3.jar, \
>   json4s-jackson_2.11-3.5.3.jar, \
>   hbase-client-1.2.3.jar, \
>   hbase-common-1.2.3.jar
> 
> Now I still get the same error!
> 
> scala> val df = withCatalog(catalog)
> java.lang.NoSuchMethodError: 
> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>   at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>   at withCatalog(:54)
> 
> Thanks
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Mon, 17 Feb 2020 at 21:37, Mich Talebzadeh  
>> wrote:
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> Many thanks both.
>> 
>> Let me check and confirm. 
>> 
>> regards,
>> 
>> Mich
>> 
>> 
>>> On Mon, 17 Feb 2020 at 21:33, Jörn Franke  wrote:
>>> Is there a reason why different Scala (it seems at least 2.10/2.11) 
>>> versions are mixed? This never works.
>>> Do you include by accident a dependency to with an old Scala version? Ie 
>>> the Hbase datasource maybe?
>>> 
>>> 
>>>>> Am 17.02.2020 um 22:15 schrieb Mich Talebzadeh 
>>>>> :
>>>>> 
>>>> 
>>>> Thanks Muthu,
>>>> 
>>>> 
>>>> I am using the following jar files for now in local mode i.e.  
>>>> spark-shell_local --jars …..
>>>> 
>>>> json4s-jackson_2.10-3.2.10.jar
>>>> json4s_2.11-3.2.11.jar
>>>> json4s-native_2.10-3.4.0.jar
>>>> 
>>>> Which one is the incorrect one please/
>>>> 
>>>> Regards,
>>>> 
>>>> Mich
>>>> 
>>>> 
>>>> 
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>> loss, damage or destruction of data or any other property which may arise 
>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>> The author will in no case be liable for any monetary damages arising from 
>>>> such loss, damage or destruction.
>>>>  
>>>> 
>>>> 
>>>>> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>>>>> I suspect the spark job is somehow having an incorrect (newer) version of 
>>>>> json4s in the classpath. json4s 3.5.3 is the utmost version that can be 
>>>>> used. 
>>>>> 
>>>>> Thanks,
>>>>> Muthu
>>>>> 
>>>>>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh  
>>>>>> wrote:
>>>>>> Hi,
>>>>>> 
>>>>>> Spark version 2.4.3
>>>>>> Hbase 1.2.7
>>>>>> 
>>>>>> Data is stored in Hbase as Json. example of a row shown below
>>>>>> 
>>>>>> 
>>>>>> I am trying to read this table i

Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Jörn Franke
Is there a reason why different Scala (it seems at least 2.10/2.11) versions 
are mixed? This never works.
Do you include by accident a dependency to with an old Scala version? Ie the 
Hbase datasource maybe?


> Am 17.02.2020 um 22:15 schrieb Mich Talebzadeh :
> 
> 
> Thanks Muthu,
> 
> 
> I am using the following jar files for now in local mode i.e.  
> spark-shell_local --jars …..
> 
> json4s-jackson_2.10-3.2.10.jar
> json4s_2.11-3.2.11.jar
> json4s-native_2.10-3.4.0.jar
> 
> Which one is the incorrect one please/
> 
> Regards,
> 
> Mich
> 
> 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>> I suspect the spark job is somehow having an incorrect (newer) version of 
>> json4s in the classpath. json4s 3.5.3 is the utmost version that can be 
>> used. 
>> 
>> Thanks,
>> Muthu
>> 
>>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> Spark version 2.4.3
>>> Hbase 1.2.7
>>> 
>>> Data is stored in Hbase as Json. example of a row shown below
>>> 
>>> 
>>> I am trying to read this table in Spark Scala
>>> 
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> import spark.sqlContext.implicits._
>>> import org.json4s._
>>> import org.json4s.jackson.JsonMethods._
>>> import org.json4s.jackson.Serialization.{read => JsonRead}
>>> import org.json4s.jackson.Serialization.{read, write}
>>> def catalog = s"""{
>>>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>>>  | "rowkey":"key",
>>>  | "columns":{
>>>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>>>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker", "type":"string"},
>>>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued", 
>>> "type":"string"},
>>>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>>>  | |}
>>>  | |}""".stripMargin
>>> def withCatalog(cat: String): DataFrame = {
>>>spark.sqlContext
>>>.read
>>>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>>>.format("org.apache.spark.sql.execution.datasources.hbase")
>>>.load()
>>> }
>>> val df = withCatalog(catalog)
>>> 
>>> 
>>> However, I am getting this error
>>> 
>>> Spark session available as 'spark'.
>>> java.lang.NoSuchMethodError: 
>>> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>>>   at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>>>   at 
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>>>   at 
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>>   at 
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>>>   at 
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>>>   at withCatalog(testme.scala:49)
>>>   ... 65 elided
>>> 
>>> I have Googled it but with little luck!
>>> 
>>> Thanks,
>>> Mich
>>> 
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  


Re: Does explode lead to more usage of memory

2020-01-18 Thread Jörn Franke
Why not two tables and then you can join them? This would be the standard way. 
it depends what your full use case is, what volumes / orders you expect on 
average, how aggregations and filters look like. The example below states that 
you do a Select all on the table.

> Am 19.01.2020 um 01:50 schrieb V0lleyBallJunki3 :
> 
> I am using a dataframe and has structure like this :
> 
> root
> |-- orders: array (nullable = true)
> ||-- element: struct (containsNull = true) 
> |||-- amount: double (nullable = true)
> |||-- id: string (nullable = true)
> |-- user: string (nullable = true)
> |-- language: string (nullable = true)
> 
> Each user has multiple orders. Now if I explode orders like this:
> 
> df.select($"user", explode($"orders").as("order")) . Each order element will
> become a row with a duplicated user and language. Was wondering if spark
> actually converts each order element into a single row in memory or it just
> logical. Because if a single user has 1000 orders  then wouldn't it lead to
> a lot more memory consumption since it is duplicating user and language a
> 1000 times (once for each order) in memory?
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GraphX performance feedback

2019-11-25 Thread Jörn Franke
I think it depends what you want do. Interactive big data graph analytics are 
probably better of in Janusgraph or similar. 
Batch processing (once-off) can be still fine in graphx - you have though to 
carefully design the process. 

> Am 25.11.2019 um 20:04 schrieb mahzad kalantari :
> 
> 
> Hi all
> 
> My question is about GraphX, I 'm looking for user feedbacks on the 
> performance.
> 
> I read this paper written by Facebook team that says Graphx has very poor 
> performance.
> https://engineering.fb.com/core-data/a-comparison-of-state-of-the-art-graph-processing-systems/
>   
> 
> Has anyone already encountered performance problems with Graphx, and is it a 
> good choice if I want to do large scale graph modelling?
> 
> 
> Thanks!
> 
> Mahzad 


Re: Spark Cluster over yarn cluster monitoring

2019-10-27 Thread Jörn Franke
Use yarn queues:

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html

> Am 27.10.2019 um 06:41 schrieb Chetan Khatri :
> 
> 
> Could someone please help me to understand better..
> 
>> On Thu, Oct 17, 2019 at 7:41 PM Chetan Khatri  
>> wrote:
>> Hi Users,
>> 
>> I do submit X number of jobs with Airflow to Yarn as a part of workflow for 
>> Y customer. I could potentially run workflow for customer Z but I need to 
>> check that how much resources are available over the cluster so jobs for 
>> next customer should start.
>> 
>> Could you please tell what is the best way to handle this. Currently, I am 
>> just checking availableMB > 100 then trigger next Airflow DAG over Yarn.
>> 
>> GET http://rm-http-address:port/ws/v1/cluster/metrics
>> Thanks.


Re: Conflicting PySpark Storage Level Defaults?

2019-09-16 Thread Jörn Franke
I don’t know your full source code but you may missing an action so that it is 
indeed persisted.

> Am 16.09.2019 um 02:07 schrieb grp :
> 
> Hi There Spark Users,
> 
> Curious what is going on here.  Not sure if possible bug or missing 
> something.  Extra eyes are much appreciated.
> 
> Spark UI (Python API 2.4.3) by default is reporting persisted data-frames to 
> be de-serialized MEMORY_AND_DISK however I always thought they were 
> serialized for Python by default according to official documentation.
> However when explicitly changing the storage level to default … ex => 
> df.persist(StorageLevel.MEMORY_AND_DISK) … the Spark UI returns the expected 
> serialized data-frame under Storage Tab, but not when just calling … 
> df.cache().
> 
> Do we have to explicitly set to … StorageLevel.MEMORY_AND_DISK … to get the 
> serialized benefit in Python (which I thought was automatic)?  Or is the 
> Spark UI incorrect?
> 
> SO post with specific example/details => 
> https://stackoverflow.com/questions/56926337/conflicting-pyspark-storage-level-defaults
> 
> Thank you for your time and research!
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Control Sqoop job from Spark job

2019-09-03 Thread Jörn Franke
This I would not say. The only “issue” with Spark is that you need to build 
some functionality on top which is available in Sqoop out of the box, 
especially for import processes and if you need to define a lot of them.

> Am 03.09.2019 um 09:30 schrieb Shyam P :
> 
> Hi Mich,
>Lot of people say that Spark does not have proven record in migrating data 
> from oracle as sqoop has.
> At list in production.
> 
> Please correct me if I am wrong and suggest how to deal with shuffling when 
> dealing with groupBy ?
> 
> Thanks,
> Shyam
> 
>> On Sat, Aug 31, 2019 at 12:17 PM Mich Talebzadeh  
>> wrote:
>> Spark is an excellent ETL tool to lift data from source and put it in 
>> target. Spark uses JDBC connection similar to Sqoop. I don't see the need 
>> for Sqoop with Spark here.
>> 
>> Where is the source (Oracle MSSQL, etc) and target (Hive?) here
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Thu, 29 Aug 2019 at 21:01, Chetan Khatri  
>>> wrote:
>>> Hi Users,
>>> I am launching a Sqoop job from Spark job and would like to FAIL Spark job 
>>> if Sqoop job fails.
>>> 
>>> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
>>> String, password: String,
>>>  query: String, splitBy: String, fetchSize: Int, 
>>> numMappers: Int, targetDir: String, jobName: String, dateColumns: String) = 
>>> {
>>> 
>>>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
>>> "databaseName=" + schemaName
>>>   var parameters = Array("import")
>>>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>>>   parameters = parameters :+ "--connect"
>>>   parameters = parameters :+ connectionString
>>>   parameters = parameters :+ "--mapreduce-job-name"
>>>   parameters = parameters :+ jobName
>>>   parameters = parameters :+ "--username"
>>>   parameters = parameters :+ username
>>>   parameters = parameters :+ "--password"
>>>   parameters = parameters :+ password
>>>   parameters = parameters :+ "--hadoop-mapred-home"
>>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>>>   parameters = parameters :+ "--hadoop-home"
>>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>>>   parameters = parameters :+ "--query"
>>>   parameters = parameters :+ query
>>>   parameters = parameters :+ "--split-by"
>>>   parameters = parameters :+ splitBy
>>>   parameters = parameters :+ "--fetch-size"
>>>   parameters = parameters :+ fetchSize.toString
>>>   parameters = parameters :+ "--num-mappers"
>>>   parameters = parameters :+ numMappers.toString
>>>   if (dateColumns.length() > 0) {
>>> parameters = parameters :+ "--map-column-java"
>>> parameters = parameters :+ dateColumns
>>>   }
>>>   parameters = parameters :+ "--target-dir"
>>>   parameters = parameters :+ targetDir
>>>   parameters = parameters :+ "--delete-target-dir"
>>>   parameters = parameters :+ "--as-avrodatafile"
>>> 
>>> }


Re: Will this use-case can be handled with spark-sql streaming and cassandra?

2019-08-29 Thread Jörn Franke
1) this is not a use case, but a technical solution. Hence nobody can tell you 
if it make sense or not
2) do an upsert in Cassandra. However keep in mind that the application 
submitting to the Kafka topic and the one consuming from the Kafka topic need 
to ensure that they process messages in the right order. This may not be always 
guaranteed, eg in case of errors, and they need to avoid overwriting new data 
with old data. This is also not a Kafka setting that has to be dealt with at 
producer and consumer level

> Am 29.08.2019 um 13:21 schrieb Shyam P :
> 
> Hi,
> I need to do a PoC for a business use-case.
> 
> Use case : Need to update a record in Cassandra table if exists.
> 
> Will spark streaming support compare each record and update existing 
> Cassandra record ?
> 
> For each record received from kakfa topic , If I want to check and compare 
> each record whether its already there in Cassandra or not , if yes , update 
> the record else insert a new record.
> 
> How can be this done using spark-structured streaming and cassandra? any 
> snippet or sample if you have.
> 
> Thank you,
> 
> Shyam


Re: Any advice how to do this usecase in spark sql ?

2019-08-13 Thread Jörn Franke
Have you tried to join both datasets, filter accordingly and then write the 
full dataset to your filesystem?
Alternatively work with a NoSQL database that you update by key (eg it sounds a 
key/value store could be useful for you).

However, it could be also that you need to do more depending on your use case.

> Am 14.08.2019 um 05:08 schrieb Shyam P :
> 
> Hi,
> Any advice how to do this in spark sql ?
> 
> I have a scenario as below
> 
> dataframe1   = loaded from an HDFS Parquet file.
> 
> dataframe2 =   read from a Kafka Stream.
> 
> If column1 of dataframe1 value in columnX value of dataframe2 , then I need 
> then I need to replace column1 value of dataframe1. 
> 
> Else add column1 value of dataframe1 to dataframe2 as a new record.
> 
> 
> 
> In a sense need to implement a look up dataframe which is refresh-able.
> 
> For more information please check
> 
> https://stackoverflow.com/questions/57479581/how-to-do-this-scenario-in-spark-streaming?noredirect=1#comment101437596_57479581
>  
> 
>  Let me know if u need more info  
> 
> Thanks


Re: Spark scala/Hive scenario

2019-08-07 Thread Jörn Franke
You can use the map datatype on the Hive table for the columns that are 
uncertain:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes

However, maybe you can share more concrete details, because there could be also 
other solutions.

> Am 07.08.2019 um 20:40 schrieb anbutech :
> 
> Hi All,
> 
> I have a scenario in (Spark scala/Hive):
> 
> Day 1:
> 
> i have a file with 5 columns which needs to be processed and loaded into
> hive tables.
> day2:
> 
> Next day the same feeds(file) has 8 columns(additional fields) which needs
> to be processed and loaded into hive tables
> 
> How do we approach this problem without changing the target table schema.Is
> there any way we can achieve this.
> 
> Thanks
> Anbu
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


Re: Hive external table not working in sparkSQL when subdirectories are present

2019-08-07 Thread Jörn Franke
Do you use the HiveContext in Spark? Do you configure the same options there? 
Can you share some code?

> Am 07.08.2019 um 08:50 schrieb Rishikesh Gawade :
> 
> Hi.
> I am using Spark 2.3.2 and Hive 3.1.0. 
> Even if i use parquet files the result would be same, because after all 
> sparkSQL isn't able to descend into the subdirectories over which the table 
> is created. Could there be any other way?
> Thanks,
> Rishikesh
> 
>> On Tue, Aug 6, 2019, 1:03 PM Mich Talebzadeh  
>> wrote:
>> which versions of Spark and Hive are you using.
>> 
>> what will happen if you use parquet tables instead?
>> 
>> HTH
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> http://talebzadehmich.wordpress.com
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Tue, 6 Aug 2019 at 07:58, Rishikesh Gawade  
>>> wrote:
>>> Hi.
>>> I have built a Hive external table on top of a directory 'A' which has data 
>>> stored in ORC format. This directory has several subdirectories inside it, 
>>> each of which contains the actual ORC files.
>>> These subdirectories are actually created by spark jobs which ingest data 
>>> from other sources and write it into this directory.
>>> I tried creating a table and setting the table properties of the same as 
>>> hive.mapred.supports.subdirectories=TRUE and 
>>> mapred.input.dir.recursive=TRUE.
>>> As a result of this, when i fire the simplest query of select count(*) from 
>>> ExtTable via the Hive CLI, it successfully gives me the expected count of 
>>> records in the table.
>>> However, when i fire the same query via sparkSQL, i get count = 0.
>>> 
>>> I think the sparkSQL isn't able to descend into the subdirectories for 
>>> getting the data while hive is able to do so.
>>> Are there any configurations needed to be set on the spark side so that 
>>> this works as it does via hive cli? 
>>> I am using Spark on YARN.
>>> 
>>> Thanks,
>>> Rishikesh
>>> 
>>> Tags: subdirectories, subdirectory, recursive, recursion, hive external 
>>> table, orc, sparksql, yarn


Re: Logistic Regression Iterations causing High GC in Spark 2.3

2019-07-29 Thread Jörn Franke
I would remove the all GC tuning and add it later once you found the underlying 
root cause. Usually more GC means you need to provide more memory, because 
something has changed (your application, spark Version etc.)

We don’t have your full code to give exact advise, but you may want to rethink 
the one code / executor approach and have less executors but more cores / 
executor. That sometimes can lead to more heap usage (especially if you 
broadcast). Keep in mind that if you use more cores/executor it usually also 
requires more memory for the executor, but less executors. Similarly the 
executor instances might be too many and they may not have enough heap.
You can also increase the memory of the executor.

> Am 29.07.2019 um 08:22 schrieb Dhrubajyoti Hati :
> 
> Hi,
> 
> We were running Logistic Regression in Spark 2.2.X and then we tried to see 
> how does it do in Spark 2.3.X. Now we are facing an issue while running a 
> Logistic Regression Model in Spark 2.3.X on top of Yarn(GCP-Dataproc). In the 
> TreeAggregate method it takes a huge time due to very High GC Activity. I 
> have tuned the GC, created different sized clusters, higher spark 
> version(2.4.X), smaller data but nothing helps. The GC time is 100 - 1000 
> times of the processing time in avg for iterations. 
> 
> The strange part is in Spark 2.2 this doesn't happen at all. Same code, same 
> cluster sizing, same data in both the cases.
> 
> I was wondering if someone can explain this behaviour and help me to resolve 
> this. How can the same code has so different behaviour in two Spark version, 
> especially the higher ones?
> 
> Here are the config which I used:
> 
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> #GC Tuning
> spark.executor.extraJavaOptions= -XX:+UseG1GC -XX:+PrintFlagsFinal 
> -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 
> -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions 
> -XX:+G1SummarizeConcMark -Xms9000m -XX:ParallelGCThreads=20 
> -XX:ConcGCThreads=5
> 
> spark.executor.instances=20
> spark.executor.cores=1
> spark.executor.memory=9010m
> 
> 
> Regards,
> Dhrub
> 


Re: Spark SaveMode

2019-07-19 Thread Jörn Franke
This is not an issue of Spark, but the underlying database. The primary key 
constraint has a purpose and ignoring it would defeat that purpose. 
Then to handle your use case, you would need to make multiple decisions that 
may imply you don’t want to simply insert if not exist. Maybe you want to do an 
upsert or how do you want to take into account deleted data?
You could use a Merge in Oracle to achieve what you have in mind. In Spark you 
would need to fetch the data from the Oracle database and then merge it in 
Spark with the new data depending on your requirements.

> Am 20.07.2019 um 06:34 schrieb Richard :
> 
> Any reason why Spark's SaveMode doesn't have mode that ignore any Primary 
> Key/Unique constraint violations?
> 
> Let's say I'm using spark to migrate some data from Cassandra to Oracle, I 
> want the insert operation to be "ignore if exist primary keys" instead of 
> failing the whole batch.
> 
> Thanks, 
> Richard 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark 2.3+] Timeseries with Spark

2019-06-13 Thread Jörn Franke
Time series can mean a lot of different things and algorithms. Can you describe 
more what you mean by time series use case, ie what is the input, what do you 
like to do with the input and what is the output?

> Am 14.06.2019 um 06:01 schrieb Rishi Shah :
> 
> Hi All,
> 
> I have a time series use case which I would like to implement in Spark... 
> What would be the best way to do so? Any built in libraries?
> 
> -- 
> Regards,
> 
> Rishi Shah

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark 2.4] Best way to define activity within different time window

2019-06-09 Thread Jörn Franke
Depending on what accuracy is needed, hyperloglogs can be an interesting 
alternative 
https://en.m.wikipedia.org/wiki/HyperLogLog

> Am 09.06.2019 um 15:59 schrieb big data :
> 
> From m opinion, Bitmap is the best solution for active users calculation. 
> Other solution almost bases on count(distinct) calculation process, which is 
> more slower.
> 
> If you 've implemented Bitmap solution including how to build Bitmap, how to 
> load Bitmap, then Bitmap is the best choice.
> 
>> 在 2019/6/5 下午6:49, Rishi Shah 写道:
>> Hi All,
>> 
>> Is there a best practice around calculating daily, weekly, monthly, 
>> quarterly, yearly active users?
>> 
>> One approach is to create a window of daily bitmap and aggregate it based on 
>> period later. However I was wondering if anyone has a better approach to 
>> tackling this problem.. 
>> 
>> -- 
>> Regards,
>> 
>> Rishi Shah


Re: writing into oracle database is very slow

2019-04-18 Thread Jörn Franke
What is the size of the data? How much time does it need on HDFS and how much 
on Oracle? How many partitions do you have on Oracle side?

> Am 06.04.2019 um 16:59 schrieb Lian Jiang :
> 
> Hi,
> 
> My spark job writes into oracle db using:
> df.coalesce(10).write.format("jdbc").option("url", url)
>   .option("driver", driver).option("user", user)
>   .option("batchsize", 2000)
>   .option("password", password).option("dbtable", 
> tableName).mode("append").save()
> It is much slow than writting into HDFS. The data to write is small.
> Is this expected? Thanks for any clue.
> 


Re: Spark SQL API taking longer time than DF API.

2019-03-31 Thread Jörn Franke
Is the select taking longer or the saving to a file. You seem to only save in 
the second case to a file 

> Am 29.03.2019 um 15:10 schrieb neeraj bhadani :
> 
> Hi Team,
>I am executing same spark code using the Spark SQL API and DataFrame API, 
> however, Spark SQL is taking longer than expected.
> 
> PFB Sudo code.
> ---
> Case 1 : Spark SQL
> ---
> %sql
> CREATE TABLE 
> AS
> 
>  WITH  AS (
>  
> )
> , AS (
>  
>  )
> 
> SELECT * FROM  
> UNION ALL
> SELECT * FROM 
> 
> ---
> Case  2 : DataFrame API
> ---
> 
> df1 = spark.sql()
> df2 = spark.sql()
> df3 = df1.union(df2)
> df3.write.saveAsTable()
> ---
> 
> As per my understanding, both Spark SQL and DtaaFrame API generate the same 
> code under the hood and execution time has to be similar.
> 
> Regards,
> Neeraj
> 


Re: Spark does not load all classes in fat jar

2019-03-18 Thread Jörn Franke
Fat jar with shading as the application not as an additional jar package 

> Am 18.03.2019 um 14:08 schrieb Jörn Franke :
> 
> Maybe that class is already loaded as part of a core library of Spark?
> 
> Do you have concrete class names?
> 
> In doubt create a fat jar and shade the dependencies in question
> 
>> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
>> 
>> Hello everyone,
>> 
>> We're having a serious issue, where we get ClassNotFoundException because, 
>> apparently the class is not found within the classpath of Spark, in both the 
>> Driver and Executors.
>> 
>> First, I checked whether the class was actually within the jar with jar tf, 
>> and there actually is. Then, I activated the following options to see which 
>> classes are actually loaded:
>> 
>> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf 
>> 'spark.executor.extraJavaOptions=-verbose:class' 
>> 
>> and I can see from the YARN stdout logs that some classes, just like the one 
>> throwing the exception, are not actually being loaded while other classes 
>> are.
>> I tried, then, using --jars to pass the jar containing the missing classes, 
>> and also using addJar() from the spark context, to no avail.
>> 
>> This looks like an issue with Spark class loader.
>> 
>> Any idea about what's happenig here? I'm using Spark 2.3.1.3.0.0.0-1634 (HDP 
>> 3.0). 
>> 
>> Thank you for your help,
>> Federico


Re: Spark does not load all classes in fat jar

2019-03-18 Thread Jörn Franke
Maybe that class is already loaded as part of a core library of Spark?

Do you have concrete class names?

In doubt create a fat jar and shade the dependencies in question

> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
> 
> Hello everyone,
> 
> We're having a serious issue, where we get ClassNotFoundException because, 
> apparently the class is not found within the classpath of Spark, in both the 
> Driver and Executors.
> 
> First, I checked whether the class was actually within the jar with jar tf, 
> and there actually is. Then, I activated the following options to see which 
> classes are actually loaded:
> 
> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf 
> 'spark.executor.extraJavaOptions=-verbose:class' 
> 
> and I can see from the YARN stdout logs that some classes, just like the one 
> throwing the exception, are not actually being loaded while other classes are.
> I tried, then, using --jars to pass the jar containing the missing classes, 
> and also using addJar() from the spark context, to no avail.
> 
> This looks like an issue with Spark class loader.
> 
> Any idea about what's happenig here? I'm using Spark 2.3.1.3.0.0.0-1634 (HDP 
> 3.0). 
> 
> Thank you for your help,
> Federico


Re: Masking username in Spark with regexp_replace and reverse functions

2019-03-17 Thread Jörn Franke
For the approach below you have to check for collisions, ie different name lead 
to same masked value.

You could hash it. However in order to avoid that one can just try different 
hashes you need to include in each name a different random factor. 

However, the anonymization problem is bigger, because based on other fields and 
correlation, the individual might still be identifiable.

> Am 16.03.2019 um 18:39 schrieb Mich Talebzadeh :
> 
> Hi,
> 
> I am looking at Description column of a bank statement (CSV download) that 
> has the following format
> 
> scala> account_table.printSchema
> root
>  |-- TransactionDate: date (nullable = true)
>  |-- TransactionType: string (nullable = true)
>  |-- Description: string (nullable = true)
>  |-- Value: double (nullable = true)
>  |-- Balance: double (nullable = true)
>  |-- AccountName: string (nullable = true)
>  |-- AccountNumber: string (nullable = true)
> 
> The column description for BACS payments contains the name of the individual 
> who paid into the third party account. I need to mask the name but cannot 
> simply use a literal as below for all contents of descriptions column!
> 
> f1.withColumn("Description", lit("*** Masked 
> ***")).select('Description.as("Who paid")
> 
> So I try the following combination
> 
> f1.select(trim(substring(substring_index('Description, ",", 
> 1),2,50)).as("name in clear"),
> reverse(regexp_replace(regexp_replace(regexp_replace(substring(regexp_replace('Description,
>  "^['A-Z]", "XX"),2,6),"[A-F]","X")," ","X"),"[,]","R")).as("Masked")).show
> +--+--+
> |  in clear|Masked|
> +--+--+
> |   FATAH SABAH|HXTXXX|
> |   C HIGGINSON|GIHXXX|
> |   SOLTA A|XTLOSX|
> +--+--+
> 
> This seems to work as it not only masks the name but also makes it consistent 
> for all names (in other words, the same username gets the same mask).
> 
> Are there any better alternatives?
> 
> Thanks
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-10 Thread Jörn Franke
yarn application -kill applicationid ?

> Am 10.02.2019 um 13:30 schrieb Serega Sheypak :
> 
> Hi there!
> I have weird issue that appears only when tasks fail at specific stage. I 
> would like to imitate failure on my own. 
> The plan is to run problematic app and then kill entire executor or some 
> tasks when execution reaches certain stage.
> 
> Is it do-able? 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-22 Thread Jörn Franke
You can try with Yarn node labels:
https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeLabel.html

Then you can whitelist nodes.

> Am 19.01.2019 um 00:20 schrieb Serega Sheypak :
> 
> Hi, is there any possibility to tell Scheduler to blacklist specific nodes in 
> advance?


Re: cache table vs. parquet table performance

2019-01-16 Thread Jörn Franke
I believe the in-memory solution misses the storage indexes that parquet / orc 
have.

The in-memory solution is more suitable if you iterate in the whole set of data 
frequently.

> Am 15.01.2019 um 19:20 schrieb Tomas Bartalos :
> 
> Hello,
> 
> I'm using spark-thrift server and I'm searching for best performing solution 
> to query hot set of data. I'm processing records with nested structure, 
> containing subtypes and arrays. 1 record takes up several KB.
> 
> I tried to make some improvement with cache table:
> cache table event_jan_01 as select * from events where day_registered = 
> 20190102;
> 
> If I understood correctly, the data should be stored in in-memory columnar 
> format with storage level MEMORY_AND_DISK. So data which doesn't fit to 
> memory will be spille to disk (I assume also in columnar format (?))
> I cached 1 day of data (1 M records) and according to spark UI storage tab 
> none of the data was cached to memory and everything was spilled to disk. The 
> size of the data was 5.7 GB.
> Typical queries took ~ 20 sec.
> 
> Then I tried to store the data to parquet format:
> CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" as 
> select * from event_jan_01;
> 
> The whole parquet took up only 178MB.
> And typical queries took 5-10 sec.
> 
> Is it possible to tune spark to spill the cached data in parquet format ?
> Why the whole cached table was spilled to disk and nothing stayed in memory ?
> 
> Spark version: 2.4.0
> 
> Best regards,
> Tomas
> 


Re: spark application takes significant some time to succeed even after all jobs are completed

2018-12-25 Thread Jörn Franke
It could be that “Spark” checks if each file after the job and with 1 files 
on HDFS it can take some time. I think this also is format specific (eg for 
parquet it does some checks) and does not occur with all formats. This time is 
not really highlighted in the UI (maybe worth to raise an enhancement issue).

It could be also that you have stragglers (partitions skewered) somewhere, but 
I assume you checked that already. 

The only thing that you can do is to have less files (for the final output but 
also in between) or live with it. There are some other tuning methods as well 
(different outputcomitter etc), but that would require more in-depth knowledge 
of your application .

> Am 25.12.2018 um 13:08 schrieb Akshay Mendole :
> 
> Yes. We have lot of small files (10 K files of around 100 MB each ) that we 
> read from and write to HDFS. But the timeline shows, the jobs has completed 
> quite some time ago and the output directory is also updated at that time.
> Thanks,
> Akshay
> 
> 
>> On Tue, Dec 25, 2018 at 5:30 PM Jörn Franke  wrote:
>> Do you have a lot of small files? Do you use S3 or similar? It could be that 
>> Spark does some IO related tasks.
>> 
>> > Am 25.12.2018 um 12:51 schrieb Akshay Mendole :
>> > 
>> > Hi, 
>> >   As you can see in the picture below, the application last job 
>> > finished at around 13:45 and I could see the output directory updated with 
>> > the results. Yet, the application took a total of 20 min more to change 
>> > the status. What could be the reason for this? Is this a known fact? The 
>> > application has 3 jobs with many stages inside each having around 10K 
>> > tasks. Could the scale be reason for this? What is it exactly spark 
>> > framework doing during this time?
>> > 
>> > 
>> > 
>> > Thanks,
>> > Akshay
>> > 


Re: spark application takes significant some time to succeed even after all jobs are completed

2018-12-25 Thread Jörn Franke
Do you have a lot of small files? Do you use S3 or similar? It could be that 
Spark does some IO related tasks.

> Am 25.12.2018 um 12:51 schrieb Akshay Mendole :
> 
> Hi, 
>   As you can see in the picture below, the application last job finished 
> at around 13:45 and I could see the output directory updated with the 
> results. Yet, the application took a total of 20 min more to change the 
> status. What could be the reason for this? Is this a known fact? The 
> application has 3 jobs with many stages inside each having around 10K tasks. 
> Could the scale be reason for this? What is it exactly spark framework doing 
> during this time?
> 
> 
> 
> Thanks,
> Akshay
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SPARK SQL] Difference between 'Hive on spark' and Spark SQL

2018-12-20 Thread Jörn Franke
If you have already a lot of queries then it makes sense to look at Hive (in a 
recent version)+TEZ+Llap and all tables in ORC format partitioned and sorted on 
filter columns. That would be the most easiest way and can improve performance 
significantly .

If you want to use Spark, eg because you want to use additional features and it 
could become part of your strategy justifying the investment:
* hive on Spark - I don’t think it is as much used as the above combination. I 
am also not sure if it supports recent Spark versions and all Hive features. It 
would also not really allow you to use Spark features beyond Hive features . 
Basically you just set a different engine in Hive and execute the queries as 
you do now. 
* spark.sql : you would have to write all your Hive queries as Spark queries 
and potentially integrate or rewrite HiveUdfs. Given that you can use 
HiveContext to execute queries it may not require so much effort to rewrite 
then. The pushdown possibilities are available in Spark. You have to write 
Spark programs to execute queries. There are some servers that you can connect 
to using SQL queries but their maturity varies.

In the end you have to make an assessment of all your queries and investigate 
if they can be executed using either of the options

> Am 20.12.2018 um 08:17 schrieb l...@china-inv.cn:
> 
> Hi, All, 
> 
> We are starting to migrate our data to Hadoop platform in hoping to use 'Big 
> Data' technologies to  
> improve our business. 
> 
> We are new in the area and want to get some help from you. 
> 
> Currently all our data is put into Hive and some complicated SQL query 
> statements are run daily. 
> 
> We want to improve the performance of these queries and have two options at 
> hand: 
> a. Turn on 'Hive on spark' feature and run HQLs and 
> b. Run those query statements with spark SQL 
> 
> What the difference between these options? 
> 
> Another question is: 
> There is a hive setting 'hive.optimze.ppd' to enable 'predicated pushdown' 
> query optimize 
> Is ther equivalent option in spark sql or the same setting also works for 
> spark SQL? 
> 
> Thanks in advance 
> 
> Boying 
> 
> 
>
> 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。
> 
>   
> This email message may contain confidential and/or privileged information. If 
> you are not the intended recipient, please do not read, save, forward, 
> disclose or copy the contents of this email or open any file attached to this 
> email. We will be grateful if you could advise the sender immediately by 
> replying this email, and delete this email and any attachment or links to 
> this email completely and immediately from your computer system.
> 
> 
> 


Re: Spark Scala reading from Google Cloud BigQuery table throws error

2018-12-18 Thread Jörn Franke
Maybe the guava version in your spark lib folder is not compatible (if your 
Spark version has a guava library)? In this case i propose to create a fat/uber 
jar potentially with a shaded guava dependency.

> Am 18.12.2018 um 11:26 schrieb Mich Talebzadeh :
> 
> Hi,
> 
> I am writing a small test code in spark-shell with attached jar dependencies
> 
> spark-shell --jars 
> /home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar
> 
>  to read an already existing table in Google BigQuery as follows:
> 
> import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
> import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
> import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
> import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
> import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
> import com.google.gson.JsonObject
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
> // Assumes you have a spark context (sc) -- running from spark-shell REPL.
> // Marked as transient since configuration is not Serializable. This should
> // only be necessary in spark-shell REPL.
> @transient
> val conf = sc.hadoopConfiguration
> // Input parameters.
> val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868"
> val projectId = conf.get("fs.gs.project.id")
> val bucket = conf.get("fs.gs.system.bucket")
> // Input configuration.
> conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
> conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
> BigQueryConfiguration.configureBigQueryInput(conf, fullyQualifiedInputTableId)
> 
> The problem I have is that even after loading jars with spark-shell --jar 
> 
> I am getting the following error at the last line
> 
> scala> BigQueryConfiguration.configureBigQueryInput(conf, 
> fullyQualifiedInputTableId)
> 
> java.lang.NoSuchMethodError: 
> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
>   at 
> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
>   at 
> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
>   ... 49 elided
> 
> It says it cannot find method
> 
> java.lang.NoSuchMethodError: 
> com.google.common.base.Preconditions.checkArgument
> 
> but I checked it and it is in the following jar file
> 
> jar tvf guava-19.0.jar| grep common.base.Preconditions
>   5249 Wed Dec 09 15:58:14 UTC 2015 com/google/common/base/Preconditions.class
> 
> I have used different version of guava jar files but none works!
> 
> The code is based on the following:
> 
> https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
> 
> Thanks
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Re: Zookeeper and Spark deployment for standby master

2018-11-26 Thread Jörn Franke
I guess it is the usual things - if the non zookeeper processes take too much 
memory , disk space etc it will negatively affect zookeeper and thus your whole 
running cluster. You will have to make for your specific architectural setting 
a risk assessment if this is acceptable. 

> Am 26.11.2018 um 07:25 schrieb Akila Wajirasena :
> 
> Hi,
> 
> Is it necessary to deploy Apache Zookeeper and Spark in separate set of hosts 
> when using standby masters? What will be the consequences of running 
> Zookeeper quorum in the same physical machines that runs the Spark cluster.
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: streaming pdf

2018-11-19 Thread Jörn Franke
And you have to write your own input format, but this is not so complicated 
(probably anyway recommended for the PDF case)

> Am 20.11.2018 um 08:06 schrieb Jörn Franke :
> 
> Well, I am not so sure about the use cases, but what about using 
> StreamingContext.fileStream?
> https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream-java.lang.String-scala.Function1-boolean-org.apache.hadoop.conf.Configuration-scala.reflect.ClassTag-scala.reflect.ClassTag-scala.reflect.ClassTag-
> 
> 
>> Am 19.11.2018 um 09:22 schrieb Nicolas Paris :
>> 
>>> On Mon, Nov 19, 2018 at 07:23:10AM +0100, Jörn Franke wrote:
>>> Why does it have to be a stream?
>>> 
>> 
>> Right now I manage the pipelines as spark batch processing. Mooving to
>> stream would add some improvements such:
>> - simplification of the pipeline
>> - more frequent data ingestion
>> - better resource management (?)
>> 
>> 
>>> On Mon, Nov 19, 2018 at 07:23:10AM +0100, Jörn Franke wrote:
>>> Why does it have to be a stream?
>>> 
>>>> Am 18.11.2018 um 23:29 schrieb Nicolas Paris :
>>>> 
>>>> Hi
>>>> 
>>>> I have pdf to load into spark with at least 
>>>> format. I have considered some options:
>>>> 
>>>> - spark streaming does not provide a native file stream for binary with
>>>> variable size (binaryRecordStream specifies a constant size) and I
>>>> would have to write my own receiver.
>>>> 
>>>> - Structured streaming allows to process avro/parquet/orc files
>>>> containing pdfs, but this makes things more complicated than
>>>> monitoring a simple folder  containing pdfs
>>>> 
>>>> - Kafka is not designed to handle messages > 100KB, and for this reason
>>>> it is not a good option to use in the stream pipeline.
>>>> 
>>>> Somebody has a suggestion ?
>>>> 
>>>> Thanks,
>>>> 
>>>> -- 
>>>> nicolas
>>>> 
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> 
>>> 
>> 
>> -- 
>> nicolas
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


Re: streaming pdf

2018-11-19 Thread Jörn Franke
Well, I am not so sure about the use cases, but what about using 
StreamingContext.fileStream?
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream-java.lang.String-scala.Function1-boolean-org.apache.hadoop.conf.Configuration-scala.reflect.ClassTag-scala.reflect.ClassTag-scala.reflect.ClassTag-


> Am 19.11.2018 um 09:22 schrieb Nicolas Paris :
> 
>> On Mon, Nov 19, 2018 at 07:23:10AM +0100, Jörn Franke wrote:
>> Why does it have to be a stream?
>> 
> 
> Right now I manage the pipelines as spark batch processing. Mooving to
> stream would add some improvements such:
> - simplification of the pipeline
> - more frequent data ingestion
> - better resource management (?)
> 
> 
>> On Mon, Nov 19, 2018 at 07:23:10AM +0100, Jörn Franke wrote:
>> Why does it have to be a stream?
>> 
>>> Am 18.11.2018 um 23:29 schrieb Nicolas Paris :
>>> 
>>> Hi
>>> 
>>> I have pdf to load into spark with at least 
>>> format. I have considered some options:
>>> 
>>> - spark streaming does not provide a native file stream for binary with
>>> variable size (binaryRecordStream specifies a constant size) and I
>>> would have to write my own receiver.
>>> 
>>> - Structured streaming allows to process avro/parquet/orc files
>>> containing pdfs, but this makes things more complicated than
>>> monitoring a simple folder  containing pdfs
>>> 
>>> - Kafka is not designed to handle messages > 100KB, and for this reason
>>> it is not a good option to use in the stream pipeline.
>>> 
>>> Somebody has a suggestion ?
>>> 
>>> Thanks,
>>> 
>>> -- 
>>> nicolas
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>> 
> 
> -- 
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


Re: streaming pdf

2018-11-18 Thread Jörn Franke
Why does it have to be a stream?

> Am 18.11.2018 um 23:29 schrieb Nicolas Paris :
> 
> Hi
> 
> I have pdf to load into spark with at least 
> format. I have considered some options:
> 
> - spark streaming does not provide a native file stream for binary with
>  variable size (binaryRecordStream specifies a constant size) and I
>  would have to write my own receiver.
> 
> - Structured streaming allows to process avro/parquet/orc files
>  containing pdfs, but this makes things more complicated than
>  monitoring a simple folder  containing pdfs
> 
> - Kafka is not designed to handle messages > 100KB, and for this reason
>  it is not a good option to use in the stream pipeline.
> 
> Somebody has a suggestion ?
> 
> Thanks,
> 
> -- 
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: writing to local files on a worker

2018-11-11 Thread Jörn Franke
Can you use JNI to call the c++ functionality directly from Java? 

Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it 
allows you to use shell scripts as mapper and reducer)?

You can also write temporary files for each partition and execute the software 
within a map step.

Generally you should not call external applications from Spark.

> Am 11.11.2018 um 23:13 schrieb Steve Lewis :
> 
> I have a problem where a critical step needs to be performed by  a third 
> party c++ application. I can send or install this program on the worker 
> nodes. I can construct  a function holding all the data this program needs to 
> process. The problem is that the program is designed to read and write from 
> the local file system. I can call the program from Java and read its output 
> as  a  local file - then deleting all temporary files but I doubt that it is 
> possible to get the program to read from hdfs or any shared file system. 
> My question is can a function running on a worker node create temporary files 
> and pass the names of these to a local process assuming everything is cleaned 
> up after the call?
> 
> -- 
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

2018-11-04 Thread Jörn Franke
Can you share some relevant source code?


> Am 05.11.2018 um 07:58 schrieb ehbhaskar :
> 
> I have a pyspark job that inserts data into hive partitioned table using
> `Insert Overwrite` statement.
> 
> Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
> S3. But, it's very slow in moving data from temp directory to the target
> path, it takes more than 40 mins to move data from temp to target path.
> 
> I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
> is 1) but still I see no change.
> 
> *Are there any ways to improve the performance of hive INSERT OVERWRITE
> query from spark?*
> 
> Also, I noticed that this behavior is even worse (i.e. job takes even more
> time) with hive table that has too many existing partitions. i.e. The data
> loads relatively fast into table that have less existing partitions.
> 
> *Some additional details:*
> * Table is a dynamic partitioned table. 
> * Spark version - 2.3.0
> * Hive version - 2.3.2-amzn-2
> * Hadoop version - 2.8.3-amzn-0
> 
> PS: Other config options I have tried that didn't have much effect on the
> job performance.
> * "hive.load.dynamic.partitions.thread - "10"
> * "hive.mv.files.thread" - "30"
> * "fs.trash.interval" - "0".
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to avoid long-running jobs blocking short-running jobs

2018-11-03 Thread Jörn Franke
Hi,

What does your Spark deployment architecture looks like? Standalone? Yarn? 
Mesos? Kubernetes? Those have resource managers (not middlewares) that allow to 
implement scenarios as you want to achieve.

 In any case you can try the FairScheduler of any of those solutions.

Best regards

> Am 03.11.2018 um 10:04 schrieb conner :
> 
> Hi,
> 
> I use spark cluster to run ETL jobs and analysis computation about the data
> after elt stage.
> The elt jobs can keep running for several hours, but analysis computation is
> a short-running job which can finish in a few seconds.
> The dilemma I entrapped is that my application runs in a single JVM and
> can't be a cluster application, so just one spark context in my application
> currently. But when the elt jobs are running,
> the jobs will occupy all resource including worker executors too long to
> block all my analysis computation jobs. 
> 
> My solution is to find a good way to divide the spark cluster resource into
> two. One part for analysis computation jobs, another for
> elt jobs. if the part for elt jobs is free, I can allocate analysis
> computation jobs to it.
> So I want to find a middleware that can support two spark context and it
> must be embedded in my application. I do some research on the third party
> project spark job server. It can divide spark resource by launching another
> JVM to run spark context with a specific resource.
> these operations are invisible to the upper layer, so it's a good solution
> for me. But this project is running in a single JVM  and just support REST
> API, I can't endure the data transfer by TCP again
> which too slow to me. I want to get a result from spark cluster by TCP and
> give this result to view layer to show.
> Can anybody give me some good suggestion? I shall be so grateful.
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark orc read performance when reading large number of small files

2018-11-01 Thread Jörn Franke
A lot of small files is very inefficient itself and predicate push down will 
not help you much there unless you merge them into one large file (one large 
file can be much more efficiently processed).

How did you validate that predicate pushdown did not work on Hive? You Hive 
Version is also very old - consider upgrading to at least Hive 2.x

> Am 31.10.2018 um 20:35 schrieb gpatcham :
> 
> spark version 2.2.0
> Hive version 1.1.0
> 
> There are lot of small files
> 
> Spark code :
> 
> "spark.sql.orc.enabled": "true",
> "spark.sql.orc.filterPushdown": "true 
> 
> val logs
> =spark.read.schema(schema).orc("hdfs://test/date=201810").filter("date >
> 20181003")
> 
> Hive:
> 
> "spark.sql.orc.enabled": "true",
> "spark.sql.orc.filterPushdown": "true 
> 
> test  table in Hive is pointing to hdfs://test/  and partitioned on date
> 
> val sqlStr = s"select * from test where date > 20181001"
> val logs = spark.sql(sqlStr)
> 
> With Hive query I don't see filter pushdown is  happening. I tried setting
> these configs in both hive-site.xml and also spark.sqlContext.setConf
> 
> "hive.optimize.ppd":"true",
> "hive.optimize.ppd.storage":"true" 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Apache Spark orc read performance when reading large number of small files

2018-10-31 Thread Jörn Franke
How large are they? A lot of (small) files will cause significant delay in 
progressing - try to merge as much as possible into one file.

Can you please share full source code in Hive and Spark as well as the versions 
you are using?

> Am 31.10.2018 um 18:23 schrieb gpatcham :
> 
> 
> 
> When reading large number of orc files from HDFS under a directory spark
> doesn't launch any tasks until some amount of time and I don't see any tasks
> running during that time. I'm using below command to read orc and spark.sql
> configs.
> 
> What spark is doing under hoods when spark.read.orc is issued?
> 
> spark.read.schema(schame1).orc("hdfs://test1").filter("date >= 20181001")
> "spark.sql.orc.enabled": "true",
> "spark.sql.orc.filterPushdown": "true
> 
> Also instead of directly reading orc files I tried running Hive query on
> same dataset. But I was not able to push filter predicate. Where should I
> set the below config's "hive.optimize.ppd":"true",
> "hive.optimize.ppd.storage":"true"
> 
> Suggest what is the best way to read orc files from HDFS and tuning
> parameters ?
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: dremel paper example schema

2018-10-31 Thread Jörn Franke
I would try with the same version as Spark uses first. I don’t have the 
changelog of parquet in my head (but you can find it ok the Internet), but it 
could be the cause of your issues.

> Am 31.10.2018 um 12:26 schrieb lchorbadjiev :
> 
> Hi Jorn,
> 
> I am using Apache Spark 2.3.1.
> 
> For creating the parquet file I have used Apache Parquet (parquet-mr) 1.10.
> This does not match the version of parquet used in Apache Spark 2.3.1 and if
> you think that this could be the problem I could try to use Apache Parquet
> version 1.8.3.
> 
> I created a parquet file using Apache Spark SQL types, but can not make the
> resulting schema to match the schema described in the paper.
> 
> What I do is to use Spark SQL array type for repeated values. For example,
> where papers says
> 
>repeated int64 Backward;
> 
> I use array type:
> 
>StructField("Backward", ArrayType(IntegerType(), containsNull=False),
> nullable=False)
> 
> The resulting schema, reported by parquet-tools is:
> 
>optional group backward (LIST) {
>  repeated group list {
>required int32 element;
>  }
>}
> 
> Thanks,
> Lubomir Chorbadjiev
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java vs scala for Apache Spark - is there a performance difference ?

2018-10-30 Thread Jörn Franke
Older versions of Spark had indeed a lower performance on Python and R due to a 
conversion need between JVM datatypes and python/r datatypes. This changed in 
Spark 2.2, I think, with the integration of Apache Arrow.  However, what you do 
after the conversion in those languages can be still slower than, for instance, 
in Java if you do not use Spark only functions. It could be also faster (eg you 
use a python module implemented natively in C and if there is no translation 
into c datatypes needed). 
Scala has in certain cases a more elegant syntax than Java (if you do not use 
Lambda). Sometimes this elegant syntax can lead to (unintentional) inefficient 
things for which there is a better way to express them (eg implicit 
conversions, use of collection methods etc). However there are better ways and 
you just have to spot these issues in the source code and address them, if 
needed. 
So a comparison does not make really sense between those languages - it always 
depends.

> Am 30.10.2018 um 07:00 schrieb akshay naidu :
> 
> how about Python. 
> java vs scala vs python vs R
> which is better.
> 
>> On Sat, Oct 27, 2018 at 3:34 AM karan alang  wrote:
>> Hello 
>> - is there a "performance" difference when using Java or Scala for Apache 
>> Spark ?
>> 
>> I understand, there are other obvious differences (less code with scala, 
>> easier to focus on logic etc), 
>> but wrt performance - i think there would not be much of a difference since 
>> both of them are JVM based, 
>> pls. let me know if this is not the case.
>> 
>> thanks!


Re: dremel paper example schema

2018-10-30 Thread Jörn Franke
Are you using the same parquet version as Spark uses? Are you using a recent 
version of Spark? Why don’t you create the file in Spark?

> Am 30.10.2018 um 07:34 schrieb lchorbadjiev :
> 
> Hi Gourav,
> 
> the question in fact is are there any the limitations of Apache Spark
> support for Parquet file format.
> 
> The example schema from the dremel paper is something that is supported in
> Apache Parquet (using Apache Parquet Java API).
> 
> Now I am trying to implement the same schema using Apache Spark SQL types,
> but not very successful. And probably this is not unexpected. 
> 
> What was unexpected is that Apache Spark can't read a parquet file with the
> dremel example schema.
> 
> Probably there are some limitations of what Apache Spark can support from
> Apache Parquet file format, but for me it is not obvious what this
> limitations are.
> 
> Thanks,
> Lubomir Chorbadjiev
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is spark not good for ingesting into updatable databases?

2018-10-27 Thread Jörn Franke
Do you have some code that you can share?

Maybe it is something in your code that unintentionally duplicates it?

Maybe your source (eg the application putting it on Kafka?)duplicates them 
already?
Once and only once processing needs to be done end to end.

> Am 27.10.2018 um 02:10 schrieb ravidspark :
> 
> Hi All,
> 
> My problem is as explained,
> 
> Environment: Spark 2.2.0 installed on CDH
> Use-Case: Reading from Kafka, cleansing the data and ingesting into a non
> updatable database.
> 
> Problem: My streaming batch duration is 1 minute and I am receiving 3000
> messages/min. I am observing a weird case where, in the map transformations
> some of the messages are being reprocessed more than once to the downstream
> transformations. Because of this I have been seeing duplicates in the
> downstream insert only database.
> 
> It would have made sense if the reprocessing of the message happens for the
> entire task in which case I would have assumed the problem is because of the
> task failure. But, in my case I don't see any task failures and only one or
> two particular messages in the task will be reprocessed. 
> 
> Everytime I relaunch the spark job to process kafka messages from the
> starting offset, it would dup the exact same messages all the time
> irrespective of number of relaunches.
> 
> I added the messages that are getting duped back to kafka at a different
> offset to see if I would observe the same problem, but this time it won't
> dup.
> 
> Workaround for now: 
> As a workaround for now, I added a cache at the end before ingestion into DB
> which gets updated processed event and thus making sure it won't be
> reprocessed again.
> 
> 
> My question here is, why am I seeing this weird behavior(only one particular
> message in the entire batch getting reprocessed again)? Is there some
> configuration that would help me fix this problem or is this a bug? 
> 
> Any solution apart from maintaining a cache would be of great help.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Triggering sql on Was S3 via Apache Spark

2018-10-23 Thread Jörn Franke
Why not directly access the S3 file from Spark?


You need to configure the IAM roles so that the machine running the S3 code is 
allowed to access the bucket.

> Am 24.10.2018 um 06:40 schrieb Divya Gehlot :
> 
> Hi Omer ,
> Here are couple of the solutions which you can implement for your use case : 
> Option 1 : 
> you can mount the S3 bucket as local file system 
> Here are the details : 
> https://cloud.netapp.com/blog/amazon-s3-as-a-file-system
> Option 2 :
>  You can use Amazon Glue for your use case 
> here are the details : 
> https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/
> 
> Option 3 :
> Store the file in the local file system and later push it s3 bucket 
> here are the details 
> https://stackoverflow.com/questions/48067979/simplest-way-to-fetch-the-file-from-ftp-server-on-prem-put-into-s3-bucket
> 
> Thanks,
> Divya 
> 
>> On Tue, 23 Oct 2018 at 15:53,  wrote:
>> Hi guys,
>> 
>>  
>> 
>> We are using Apache Spark on a local machine.
>> 
>>  
>> 
>> I need to implement the scenario below.
>> 
>>  
>> 
>> In the initial load:
>> 
>> CRM application will send a file to a folder. This file contains customer 
>> information of all customers. This file is in a folder in the local server. 
>> File name is: customer.tsv
>> Customer.tsv contains customerid, country, birty_month, activation_date etc
>> I need to read the contents of customer.tsv.
>> I will add current timestamp info to the file.
>> I will transfer customer.tsv to the S3 bucket: customer.history.data
>>  
>> 
>> In the daily loads:
>> 
>>  CRM application will send a new file which contains the 
>> updated/deleted/inserted customer information.
>>   File name is daily_customer.tsv
>> 
>> Daily_customer.tsv contains contains customerid, cdc_field, country, 
>> birty_month, activation_date etc
>> Cdc field can be New-Customer, Customer-is-Updated, Customer-is-Deleted.
>> 
>> I need to read the contents of daily_customer.tsv.
>> I will add current timestamp info to the file.
>> I will transfer daily_customer.tsv to the S3 bucket: customer.daily.data
>> I need to merge two buckets customer.history.data and customer.daily.data.
>> Two buckets have timestamp fields. So I need to query all records whose 
>> timestamp is the last timestamp.
>> I can use row_number() over(partition by customer_id order by 
>> timestamp_field desc) as version_number
>> Then I can put the records whose version is one, to the final bucket: 
>> customer.dimension.data
>>  
>> 
>> I am running Spark on premise.
>> 
>> Can I query on AWS S3 buckets by using Spark Sql / Dataframe or RDD on a 
>> local Spark cluster?
>> Is this approach efficient? Will the queries transfer all historical data 
>> from AWS S3 to the local cluster?
>> How can I implement this scenario in a more effective way? Like just 
>> transferring daily data to AWS S3 and then running queries on AWS.
>> For instance Athena can query on AWS. But it is just a query engine. As I 
>> know I can not call it by using an sdk and I can not write the results to a 
>> bucket/folder.
>>  
>> 
>> Thanks in advance,
>> 
>> Ömer
>> 
>>  
>> 
>>
>> 
>>  
>> 
>>  


Re: Process Million Binary Files

2018-10-11 Thread Jörn Franke
I believe your use case can be better covered with an own data source reading 
PDF files.

 On Big Data platforms in general you have the issue that individual PDF files 
are very small and are a lot of them - this is not very efficient for those 
platforms. That could be also one source of your performance problems (not 
necessarily the parallelism). You would need to make 1 mio requests to the 
namenode (this could be also interpreted as a Denial-of-Service attack). 
Historically, Hadoop Archives were introduced to address this problem: 
https://hadoop.apache.org/docs/r1.2.1/hadoop_archives.html

You can try also to store them first in Hbase or in the future on Hadoop Ozone. 
That could make a higher parallelism possible „out of the box“. 

> Am 10.10.2018 um 23:56 schrieb Joel D :
> 
> Hi,
> 
> I need to process millions of PDFs in hdfs using spark. First I’m trying with 
> some 40k files. I’m using binaryFiles api with which I’m facing couple of 
> issues:
> 
> 1. It creates only 4 tasks and I can’t seem to increase the parallelism 
> there. 
> 2. It took 2276 seconds and that means for millions of files it will take 
> ages to complete. I’m also expecting it to fail for million records with some 
> timeout or gc overhead exception.
> 
> Val files = sparkSession.sparkContext.binaryFiles(filePath, 200).cache
> 
> Val fileContentRdd = files.map(file => myFunc(file)
> 
> 
> 
> Do you have any guidance on how I can process millions of files using 
> binaryFiles api?
> 
> How can I increase the number of tasks/parallelism during the creation of 
> files rdd?
> 
> Thanks
> 


Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

2018-10-09 Thread Jörn Franke
Generally please avoid System.out.println, but use a logger -even for examples. 
People may take these examples from here and put it in their production code.

> Am 09.10.2018 um 15:39 schrieb Shubham Chaurasia :
> 
> Alright, so it is a big project which uses a SQL store underneath.
> I extracted out the minimal code and made a smaller project out of it and 
> still it is creating multiple instances. 
> 
> Here is my project:
> 
> ├── my-datasource.iml
> ├── pom.xml
> ├── src
> │   ├── main
> │   │   ├── java
> │   │   │   └── com
> │   │   │   └── shubham
> │   │   │   ├── MyDataSource.java
> │   │   │   └── reader
> │   │   │   └── MyDataSourceReader.java
> 
> 
> MyDataSource.java
> -
> package com.shubham;
> 
> import com.shubham.reader.MyDataSourceReader;
> import org.apache.spark.sql.SaveMode;
> import org.apache.spark.sql.sources.v2.DataSourceOptions;
> import org.apache.spark.sql.sources.v2.DataSourceV2;
> import org.apache.spark.sql.sources.v2.ReadSupport;
> import org.apache.spark.sql.sources.v2.WriteSupport;
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
> import org.apache.spark.sql.types.StructType;
> 
> import java.util.Optional;
> 
> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {
> 
>   public DataSourceReader createReader(DataSourceOptions options) {
> System.out.println("MyDataSource.createReader: Going to create a new 
> MyDataSourceReader");
> return new MyDataSourceReader(options.asMap());
>   }
> 
>   public Optional createWriter(String writeUUID, StructType 
> schema, SaveMode mode, DataSourceOptions options) {
> return Optional.empty();
>   }
> }
> 
> MyDataSourceReader.java
> -
> package com.shubham.reader;
> 
> import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
> import org.apache.spark.sql.sources.v2.reader.InputPartition;
> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
> import org.apache.spark.sql.types.StructType;
> import org.apache.spark.sql.vectorized.ColumnarBatch;
> 
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> 
> public class MyDataSourceReader implements DataSourceReader, 
> SupportsScanColumnarBatch {
> 
>   private Map options;
>   private StructType schema;
> 
>   public MyDataSourceReader(Map options) {
> System.out.println("MyDataSourceReader.MyDataSourceReader: 
> Instantiated" + this);
> this.options = options;
>   }
> 
>   @Override
>   public StructType readSchema() {
> this.schema = (new StructType())
> .add("col1", "int")
> .add("col2", "string");
> System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " 
> + this.schema);
> return this.schema;
>   }
> 
>   @Override
>   public List> planBatchInputPartitions() {
> System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this 
> + " schema: " + this.schema);
> return new ArrayList<>();
>   }
> }
> 
> 
> spark-shell output
> 
> scala> spark.read.format("com.shubham.MyDataSource").option("query", "select 
> * from some_table").load.show
> 
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: 
> Instantiatedcom.shubham.reader.MyDataSourceReader@69fa5536
> MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 
> schema: StructType(StructField(col1,IntegerType,true), 
> StructField(col2,StringType,true))
> MyDataSource.createReader: Going to create a new MyDataSourceReader
> MyDataSourceReader.MyDataSourceReader: 
> Instantiatedcom.shubham.reader.MyDataSourceReader@3095c449
> MyDataSourceReader.planBatchInputPartitions: 
> com.shubham.reader.MyDataSourceReader@3095c449 schema: null
> +++
> |col1|col2|
> +++
> +++
> 
> 
> Here 2 instances of reader, MyDataSourceReader@69fa5536 and 
> MyDataSourceReader@3095c449 are being created. Consequently schema is null in 
> MyDataSourceReader@3095c449.
> 
> Am I not doing it the correct way?
> 
> Thanks,
> Shubham
> 
>> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf  
>> wrote:
>> I am using v2.4.0-RC2
>> 
>>  
>> 
>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). 
>> How are you calling it?
>> 
>>  
>> 
>> When I do:
>> 
>> Val df = spark.read.format(mypackage).load().show()
>> 
>> I am getting a single creation, how are you creating the reader?
>> 
>>  
>> 
>> Thanks,
>> 
>> Assaf
>> 
>>  
>> 
>> From: Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] 
>> Sent: Tuesday, October 9, 2018 2:02 PM
>> To: Mendelson, Assaf; user@spark.apache.org
>> Subject: Re: DataSourceV2 APIs creating multiple instances of 
>> DataSourceReader and hence not 

Re: Use SparkContext in Web Application

2018-10-04 Thread Jörn Franke
Depending on your model size you can store it as PFA or PMML and run the 
prediction in Java. For larger models you will need a custom solution , 
potentially using a spark thrift Server/spark job server/Livy and a cache to 
store predictions that have been already calculated (eg based on previous 
requests to predict). Then you run also into thoughts on caching prediction 
results on the model version that has been used, evicting non-relevant 
predictions etc
Making the model available as a service is currently a topic where a lot of 
custom „plumbing“ is required , especially if models are a little bit larger.

> Am 04.10.2018 um 06:55 schrieb Girish Vasmatkar 
> :
> 
> 
> 
>> On Mon, Oct 1, 2018 at 12:18 PM Girish Vasmatkar 
>>  wrote:
>> Hi All
>> 
>> We are very early into our Spark days so the following may sound like a 
>> novice question :) I will try to keep this as short as possible.
>> 
>> We are trying to use Spark to introduce a recommendation engine that can be 
>> used to provide product recommendations and need help on some design 
>> decisions before moving forward. Ours is a web application running on 
>> Tomcat. So far, I have created a simple POC (standalone java program) that 
>> reads in a CSV file and feeds to FPGrowth and then fits the data and runs 
>> transformations. I would like to be able to do the following -
>> 
>> Scheduler runs nightly in Tomcat (which it does currently) and reads 
>> everything from the DB to train/fit the system. This can grow into really 
>> some large data and everyday we will have new data. Should I just use 
>> SparkContext here, within my scheduler, to FIT the system? Is this correct 
>> way to go about this? I am also planning to save the model on S3 which 
>> should be okay. We also thought on using HDFS. The scheduler's job will be 
>> just to create model and save the same and be done with it.
>> On the product page, we can then use the saved model to display the product 
>> recommendations for a particular product.
>> My understanding is that I should be able to use SparkContext here in my web 
>> application to just load the saved model and use it to derive the 
>> recommendations. Is this a good design? The problem I see using this 
>> approach is that the SparkContext does take time to initialize and this may 
>> cost dearly. Or should we keep SparkContext per web application to use a 
>> single instance of the same? We can initialize a SparkContext during 
>> application context initializaion phase. 
>> 
>> Since I am fairly new to using Spark properly, please help me take decision 
>> on whether the way I plan to use Spark is the recommended way? I have also 
>> seen use cases involving kafka tha does communication with Spark, but can we 
>> not do it directly using Spark Context? I am sure a lot of my understanding 
>> is wrong, so please feel free to correct me.
>> 
>> Thanks and Regards,
>> Girish Vasmatkar
>> HotWax Systems
>> 
>> 
>> 


Re: How to read remote HDFS from Spark using username?

2018-10-03 Thread Jörn Franke
Looks like a firewall issue

> Am 03.10.2018 um 09:34 schrieb Aakash Basu :
> 
> The stacktrace is below -
> 
>> ---
>> Py4JJavaError Traceback (most recent call last)
>>  in ()
>> > 1 df = 
>> spark.read.load("hdfs://35.154.242.76:9000/auto-ml/projects/auto-ml-test__8503cdc4-21fc-4fae-87c1-5b879cafff71/data/breast-cancer-wisconsin.csv")
>> /opt/spark/python/pyspark/sql/readwriter.py in load(self, path, format, 
>> schema, **options)
>>  164 self.options(**options)
>>  165 if isinstance(path, basestring):
>> --> 166 return self._df(self._jreader.load(path))
>>  167 elif path is not None:
>>  168 if type(path) != list:
>> /opt/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in 
>> __call__(self, *args)
>>  1158 answer = self.gateway_client.send_command(command)
>>  1159 return_value = get_return_value(
>> -> 1160 answer, self.gateway_client, self.target_id, self.name)
>>  1161 
>>  1162 for temp_arg in temp_args:
>> /opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>  61 def deco(*a, **kw):
>>  62 try:
>> ---> 63 return f(*a, **kw)
>>  64 except py4j.protocol.Py4JJavaError as e:
>>  65 s = e.java_exception.toString()
>> /opt/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in 
>> get_return_value(answer, gateway_client, target_id, name)
>>  318 raise Py4JJavaError(
>>  319 "An error occurred while calling {0}{1}{2}.\n".
>> --> 320 format(target_id, ".", name), value)
>>  321 else:
>>  322 raise Py4JError(
>> Py4JJavaError: An error occurred while calling o244.load.
>> : java.net.ConnectException: Call From 
>> Sandeeps-MacBook-Pro.local/192.168.50.188 to 
>> ec2-35-154-242-76.ap-south-1.compute.amazonaws.com:9000 failed on connection 
>> exception: java.net.ConnectException: Connection refused; For more details 
>> see: http://wiki.apache.org/hadoop/ConnectionRefused
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>  at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>  at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
>>  at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>>  at org.apache.hadoop.ipc.Client.call(Client.java:1479)
>>  at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>  at 
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>  at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
>>  at 
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>  at 
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>  at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
>>  at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>>  at 
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>>  at 
>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>>  at 
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>  at 
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>>  at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>  at 
>> org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:714)
>>  at 
>> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
>>  at 
>> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
>>  at 
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>>  at 
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>>  at scala.collection.immutable.List.foreach(List.scala:381)
>>  at 
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>>  at scala.collection.immutable.List.flatMap(List.scala:344)
>>  at 
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
>>  at 
>> 

Re: How to access line fileName in loading file using the textFile method

2018-09-24 Thread Jörn Franke
You can create your own data source exactly doing this. 

Why is the file name important if the file content is the same?

> On 24. Sep 2018, at 13:53, Soheil Pourbafrani  wrote:
> 
> Hi, My text data are in the form of text file. In the processing logic, I 
> need to know each word is from which file. Actually, I need to tokenize the 
> words and create the pair of . The naive solution is to call 
> sc.textFile for each file and having the fileName in a variable, create the 
> pairs, but it's not efficient and I got the StackOverflow error as dataset 
> grew.
> 
> So my question is supposing all files are in a directory and I read then 
> using sc.textFile("path/*"), how can I understand each data is for which file?
> 
> Is it possible (and needed) to customize the textFile method?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Use Shared Variable in PySpark Executors

2018-09-22 Thread Jörn Franke
Do you want to calculate it and share it once with all other executors? Then a 
broadcast variable maybe interesting for you,

> On 22. Sep 2018, at 16:33, Soheil Pourbafrani  wrote:
> 
> Hi, I want to do some processing with PySpark and save the results in a 
> variable of type tuple that should be shared among the executors for further 
> processing.
> Actually, it's a Text Mining Processing and I want to use the Vector Space 
> Model. So I want to calculate the Vector of all Words (that should be 
> reachable for all executors) and save it in a tuple. Is it possible in Spark 
> or I should use external storage like database or files?
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Time-Series Forecasting

2018-09-19 Thread Jörn Franke
What functionality do you need ? Ie which methods?

> On 19. Sep 2018, at 18:01, Mina Aslani  wrote:
> 
> Hi,
> I have a question for you. Do we have any Time-Series Forecasting library in 
> Spark? 
> 
> Best regards,
> Mina

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Drawing Big Data tech diagrams using Pen Tablets

2018-09-12 Thread Jörn Franke
You can try cloud services such as draw.io or  similar.

> On 12. Sep 2018, at 20:31, Mich Talebzadeh  wrote:
> 
> Hi Gourav,
> 
> I have an IPAD that my son uses it and not me (for games). I don't see much 
> value in spending $$$ on Surface. Then I had montblanc augmented paper that 
> kinf of impressive but not really that practical.
> 
> I have Visio 2010 and an upgrade to 2016 will cost good money. So I decided 
> to get this pen tablet after seeing these diagrams. They are pretty cool 
> 
> 
> 
> In UK it retails at £495 so not cheap but much cheaper than surface pro which 
> is a premium brand.
> 
> Cheers,
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 12 Sep 2018 at 18:34, Gourav Sengupta  
>> wrote:
>> Hi Mich,
>> 
>> does Apple iPad and Surface not have their own tools for this? 
>> 
>> Regards,
>> Gourav Sengupta
>> 
>>> On Tue, Sep 11, 2018 at 8:21 PM Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> Sorry this question may not be related to Spark but sure many people use MS 
>>> Visio or other graphics tools for architecture type diagrams with big data 
>>> including Spark.
>>> 
>>> Visio is time consuming and cumbersome although that is what I use for tech 
>>> diagrams. I tried Montblanc augmented paper but that is clumsy and 
>>> expensive.
>>> 
>>> I was wondering if anyone has tried some tools like Wacom Intuos Pro Paper 
>>> Edition Pen Tablet
>>> or any similar tools for easier drawing and their recommendation?
>>> 
>>> Thanks,
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> http://talebzadehmich.wordpress.com
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  


Re: How to parallelize zip file processing?

2018-08-10 Thread Jörn Franke
Does the zip file contain only one file? I fear in this case you can only have 
one core. 

Do you mean by the way gzip? In this case you cannot decompress it in 
parallel...

How is the zip file created ? Can’t you create several ones?

> On 10. Aug 2018, at 22:54, mytramesh  wrote:
> 
> I know, spark doesn’t support zip file directly since it not distributable.
> Any techniques to process this file quickly?
> 
> I am trying to process around 4GB zip file. All data is moving one executor,
> and only one task is getting assigned to process all the data. 
> 
> Even when I run repartition method, data is getting portioned but on same
> executor. 
> 
> 
> How to distribute data to other executors? 
> How to get assigned more tasks/threads when It got portioned on same
> executor? 
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Sparser library

2018-08-10 Thread Jörn Franke
You need to include the library in your dependencies. Furthermore the * does 
not make sense in the end.

> On 10. Aug 2018, at 07:48, umargeek  wrote:
> 
> Hi Team,
> 
> Please let me know the spark Sparser library to use while submitting the
> spark application to use below mentioned format,
> 
> val df = spark.read.format("*edu.stanford.sparser.json*")
> 
> When I used above format it throwed error class not found exception.
> 
> Thanks,
> Umar
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Broadcast variable size limit?

2018-08-05 Thread Jörn Franke
I think if you need more then you should anyway think about something different 
than broadcast variable ...

> On 5. Aug 2018, at 16:51, klrmowse  wrote:
> 
> is it currently still ~2GB (Integer.MAX_VALUE) ??
> 
> or am i misinformed, since that's what google-search and scouring this
> mailing list seem to say... ?
> 
> 
> 
> Thanks
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Do GraphFrames support streaming?

2018-07-14 Thread Jörn Franke
No, streaming dataframe needs to be written to disk or similar (or an in-memory 
backend) then when the next stream arrive join them - create graph and store 
the next stream together with the existing stream on disk etc.

> On 14. Jul 2018, at 17:19, kant kodali  wrote:
> 
> The question now would be can it be done in streaming fashion? Are you 
> talking about the union of two streaming dataframes and then constructing a 
> graphframe (also during streaming) ?
> 
>> On Sat, Jul 14, 2018 at 8:07 AM, Jörn Franke  wrote:
>> For your use case one might indeed be able to work simply with incremental 
>> graph updates. However they are not straight forward in Spark. You can union 
>> the new Data with the existing dataframes that represent your graph and 
>> create from that a new graph frame.
>> 
>> However I am not sure if this will fully fulfill your requirement for 
>> incremental graph updates.
>> 
>>> On 14. Jul 2018, at 15:59, kant kodali  wrote:
>>> 
>>> "You want to update incrementally an existing graph and run incrementally a 
>>> graph algorithm suitable for this - you have to implement yourself as far 
>>> as I am aware"
>>> 
>>> I want to update the graph incrementally and want to run some graph queries 
>>> similar to Cypher like give me all the vertices that are connected by a 
>>> specific set of edges and so on. Don't really intend to run graph 
>>> algorithms like ConnectedComponents or anything else at this point but of 
>>> course, it's great to have.
>>> 
>>> If we were to do this myself should I extend the GraphFrame? any 
>>> suggestions?
>>> 
>>> 
>>>> On Sun, Apr 29, 2018 at 3:24 AM, Jörn Franke  wrote:
>>>> What is the use case you are trying to solve?
>>>> You want to load graph data from a streaming window in separate graphs - 
>>>> possible but requires probably a lot of memory. 
>>>> You want to update an existing graph with new streaming data and then 
>>>> fully rerun an algorithms -> look at Janusgraph
>>>> You want to update incrementally an existing graph and run incrementally a 
>>>> graph algorithm suitable for this - you have to implement yourself as far 
>>>> as I am aware
>>>> 
>>>> > On 29. Apr 2018, at 11:43, kant kodali  wrote:
>>>> > 
>>>> > Do GraphFrames support streaming?
>>> 
> 


Re: Do GraphFrames support streaming?

2018-07-14 Thread Jörn Franke
For your use case one might indeed be able to work simply with incremental 
graph updates. However they are not straight forward in Spark. You can union 
the new Data with the existing dataframes that represent your graph and create 
from that a new graph frame.

However I am not sure if this will fully fulfill your requirement for 
incremental graph updates.

> On 14. Jul 2018, at 15:59, kant kodali  wrote:
> 
> "You want to update incrementally an existing graph and run incrementally a 
> graph algorithm suitable for this - you have to implement yourself as far as 
> I am aware"
> 
> I want to update the graph incrementally and want to run some graph queries 
> similar to Cypher like give me all the vertices that are connected by a 
> specific set of edges and so on. Don't really intend to run graph algorithms 
> like ConnectedComponents or anything else at this point but of course, it's 
> great to have.
> 
> If we were to do this myself should I extend the GraphFrame? any suggestions?
> 
> 
>> On Sun, Apr 29, 2018 at 3:24 AM, Jörn Franke  wrote:
>> What is the use case you are trying to solve?
>> You want to load graph data from a streaming window in separate graphs - 
>> possible but requires probably a lot of memory. 
>> You want to update an existing graph with new streaming data and then fully 
>> rerun an algorithms -> look at Janusgraph
>> You want to update incrementally an existing graph and run incrementally a 
>> graph algorithm suitable for this - you have to implement yourself as far as 
>> I am aware
>> 
>> > On 29. Apr 2018, at 11:43, kant kodali  wrote:
>> > 
>> > Do GraphFrames support streaming?
> 


Re: Inferring Data driven Spark parameters

2018-07-03 Thread Jörn Franke
Don’t do this in your job. Create for different types of jobs different jobs 
and orchestrate them using oozie or similar.

> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
> 
> Hi,
> 
> Cluster - 5 node (1 Driver and 4 workers)
> Driver Config: 16 cores, 32 GB RAM
> Worker Config: 8 cores, 16 GB RAM
> 
> I'm using the below parameters from which I know the first chunk is cluster 
> dependent and the second chunk is data/code dependent.
> 
> --num-executors 4 
> --executor-cores 5
> --executor-memory 10G 
> --driver-cores 5 
> --driver-memory 25G 
> 
> 
> --conf spark.sql.shuffle.partitions=100 
> --conf spark.driver.maxResultSize=2G 
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" 
> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
> 
> I've come upto these values depending on my R on the properties and the 
> issues I faced and hence the handles.
> 
> My ask here is -
> 
> 1) How can I infer, using some formula or a code, to calculate the below 
> chunk dependent on the data/code?
> 2) What are the other usable properties/configurations which I can use to 
> shorten my job runtime?
> 
> Thanks,
> Aakash.


Re: Dataframe reader does not read microseconds, but TimestampType supports microseconds

2018-07-02 Thread Jörn Franke
How do you read the files ? Do you have some source code ? It could be related 
to the Json data source.

What Spark version do you use?

> On 2. Jul 2018, at 09:03, Colin Williams  
> wrote:
> 
> I'm confused as to why Sparks Dataframe reader does not support reading json 
> or similar with microsecond timestamps to microseconds, but instead reads 
> into millis.
> 
> This seems strange when the TimestampType supports microseconds.
> 
> For example create a schema for a json object with a column of TimestampType. 
> Then read data from that column with timestamps with microseconds like 
> 
> 2018-05-13 20:25:34.153712
> 
> 2018-05-13T20:25:37.348006
> 
> You will end up with timestamps with millisecond precision. 
> 
> E.G. 2018-05-13 20:25:34.153
> 
> 
> 
> When reading about TimestampType: The data type representing 
> java.sql.Timestamp values. Please use the singleton DataTypes.TimestampType. 
> 
> java.sql.timestamp provides a method that reads timestamps like 
> Timestamp.valueOf("2018-05-13 20:25:37.348006") including milliseconds.
> 
> So why does Spark's DataFrame reader drop the ball on this?


Re: How to validate orc vectorization is working within spark application?

2018-06-19 Thread Jörn Franke
Full code? What is expected performance and actual ?
What is the use case?

> On 20. Jun 2018, at 05:33, umargeek  wrote:
> 
> Hi Folks,
> 
> I would just require few pointers on the above query w.r.t vectorization
> looking forward for support from the community.
> 
> Thanks,
> Umar
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-11 Thread Jörn Franke
If it is in kB then spark will always schedule it to one node. As soon as it 
gets bigger you will see usage of more nodes.

Hence increase your testing Dataset .

> On 11. Jun 2018, at 12:22, Aakash Basu  wrote:
> 
> Jorn - The code is a series of feature engineering and model tuning 
> operations. Too big to show. Yes, data volume is too low, it is in KBs, just 
> tried to experiment with a small dataset before going for a large one.
> 
> Akshay - I ran with your suggested spark configurations, I get this (the node 
> changed, but the problem persists) -
> 
> 
> 
> 
> 
>> On Mon, Jun 11, 2018 at 3:16 PM, akshay naidu  
>> wrote:
>> try
>>  --num-executors 3 --executor-cores 4 --executor-memory 2G --conf 
>> spark.scheduler.mode=FAIR
>> 
>>> On Mon, Jun 11, 2018 at 2:43 PM, Aakash Basu  
>>> wrote:
>>> Hi,
>>> 
>>> I have submitted a job on 4 node cluster, where I see, most of the 
>>> operations happening at one of the worker nodes and other two are simply 
>>> chilling out.
>>> 
>>> Picture below puts light on that -
>>> 
>>> How to properly distribute the load?
>>> 
>>> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
>>> 
>>> Cores - 6
>>> RAM - 12 GB
>>> HDD - 60 GB
>>> 
>>> My Spark Submit command is as follows -
>>> 
>>> spark-submit --master spark://192.168.49.37:7077 --num-executors 3 
>>> --executor-cores 5 --executor-memory 4G 
>>> /appdata/bblite-codebase/prima_diabetes_indians.py
>>> 
>>> What to do?
>>> 
>>> Thanks,
>>> Aakash.
>> 
> 


Re: [Spark Optimization] Why is one node getting all the pressure?

2018-06-11 Thread Jörn Franke
What is your code ? Maybe this one does an operation which is bound to a single 
host or your data volume is too small for multiple hosts.

> On 11. Jun 2018, at 11:13, Aakash Basu  wrote:
> 
> Hi,
> 
> I have submitted a job on 4 node cluster, where I see, most of the operations 
> happening at one of the worker nodes and other two are simply chilling out.
> 
> Picture below puts light on that -
> 
> How to properly distribute the load?
> 
> My cluster conf (4 node cluster [1 driver; 3 slaves]) -
> 
> Cores - 6
> RAM - 12 GB
> HDD - 60 GB
> 
> My Spark Submit command is as follows -
> 
> spark-submit --master spark://192.168.49.37:7077 --num-executors 3 
> --executor-cores 5 --executor-memory 4G 
> /appdata/bblite-codebase/prima_diabetes_indians.py
> 
> What to do?
> 
> Thanks,
> Aakash.


Re: Spark / Scala code not recognising the path?

2018-06-09 Thread Jörn Franke
Why don’t you write the final name from the start?
Ie save as the file it should be named.

> On 9. Jun 2018, at 09:44, Abhijeet Kumar  wrote:
> 
> I need to rename the file. I can write a separate program for this, I think.
> 
> Thanks,
> Abhijeet Kumar 
>> On 09-Jun-2018, at 1:10 PM, Jörn Franke  wrote:
>> 
>> That would be an anti pattern and would lead to bad software.
>> Please don’t do it for the sake of the people that use your software.
>> What do you exactly want to achieve with the information if the file exists 
>> or not?
>> 
>>> On 9. Jun 2018, at 08:34, Abhijeet Kumar  
>>> wrote:
>>> 
>>> Can you please tell the estimated time. So, that my program will wait for 
>>> that time period.
>>> 
>>> Thanks,
>>> Abhijeet Kumar
>>>> On 09-Jun-2018, at 12:01 PM, Jörn Franke  wrote:
>>>> 
>>>> You need some time until the information of the file creation is 
>>>> propagated.
>>>> 
>>>>> On 9. Jun 2018, at 08:07, Abhijeet Kumar  
>>>>> wrote:
>>>>> 
>>>>> I'm modifying a CSV file which is inside HDFS and finally putting it back 
>>>>> to HDFS in Spark.
>>>>> val fs=FileSystem.get(spark.sparkContext.hadoopConfiguration)
>>>>> csv_file.coalesce(1).write
>>>>>   .format("csv”)
>>>>>   .mode("overwrite”)
>>>>>   .save("hdfs://localhost:8020/data/temp_insight”)
>>>>> Thread.sleep(15000)
>>>>> println(fs.exists(new Path("/data/temp_insight")))
>>>>> Output:
>>>>> 
>>>>> false
>>>>> while I have stopped the thread for 15 sec, I have checked my hdfs using 
>>>>> command
>>>>> 
>>>>> hdfs dfs -ls /data/temp_insight
>>>>> Output:
>>>>> 
>>>>> 18/06/08 17:48:18 WARN util.NativeCodeLoader: Unable to load 
>>>>> native-hadoop library for your platform... using builtin-java classes 
>>>>> where applicable
>>>>> -rw-r--r--   3 abhijeet supergroup  0 2018-06-08 17:48 
>>>>> /data/temp_insight/_SUCCESS
>>>>> -rw-r--r--   3 abhijeet supergroup201 2018-06-08 17:48 
>>>>> /data/temp_insight/part-0-7bffb826-f18d-4022-b089-da85565525b7-c000.csv
>>>>> To cross verify whether it is taking the path of hdfs or not I have added 
>>>>> one more println statement in my code, providing the path which is 
>>>>> already there in HDFS. It's showing true in that case.
>>>>> 
>>>>> So, what could be the reason?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Abhijeet Kumar
>>> 
> 


Re: Spark / Scala code not recognising the path?

2018-06-09 Thread Jörn Franke
That would be an anti pattern and would lead to bad software.
Please don’t do it for the sake of the people that use your software.
What do you exactly want to achieve with the information if the file exists or 
not?

> On 9. Jun 2018, at 08:34, Abhijeet Kumar  wrote:
> 
> Can you please tell the estimated time. So, that my program will wait for 
> that time period.
> 
> Thanks,
> Abhijeet Kumar
>> On 09-Jun-2018, at 12:01 PM, Jörn Franke  wrote:
>> 
>> You need some time until the information of the file creation is propagated.
>> 
>>> On 9. Jun 2018, at 08:07, Abhijeet Kumar  
>>> wrote:
>>> 
>>> I'm modifying a CSV file which is inside HDFS and finally putting it back 
>>> to HDFS in Spark.
>>> val fs=FileSystem.get(spark.sparkContext.hadoopConfiguration)
>>> csv_file.coalesce(1).write
>>>   .format("csv”)
>>>   .mode("overwrite”)
>>>   .save("hdfs://localhost:8020/data/temp_insight”)
>>> Thread.sleep(15000)
>>> println(fs.exists(new Path("/data/temp_insight")))
>>> Output:
>>> 
>>> false
>>> while I have stopped the thread for 15 sec, I have checked my hdfs using 
>>> command
>>> 
>>> hdfs dfs -ls /data/temp_insight
>>> Output:
>>> 
>>> 18/06/08 17:48:18 WARN util.NativeCodeLoader: Unable to load native-hadoop 
>>> library for your platform... using builtin-java classes where applicable
>>> -rw-r--r--   3 abhijeet supergroup  0 2018-06-08 17:48 
>>> /data/temp_insight/_SUCCESS
>>> -rw-r--r--   3 abhijeet supergroup201 2018-06-08 17:48 
>>> /data/temp_insight/part-0-7bffb826-f18d-4022-b089-da85565525b7-c000.csv
>>> To cross verify whether it is taking the path of hdfs or not I have added 
>>> one more println statement in my code, providing the path which is already 
>>> there in HDFS. It's showing true in that case.
>>> 
>>> So, what could be the reason?
>>> 
>>> Thanks,
>>> 
>>> Abhijeet Kumar
> 


Re: Spark / Scala code not recognising the path?

2018-06-09 Thread Jörn Franke
You need some time until the information of the file creation is propagated.

> On 9. Jun 2018, at 08:07, Abhijeet Kumar  wrote:
> 
> I'm modifying a CSV file which is inside HDFS and finally putting it back to 
> HDFS in Spark.
> val fs=FileSystem.get(spark.sparkContext.hadoopConfiguration)
> csv_file.coalesce(1).write
>   .format("csv”)
>   .mode("overwrite”)
>   .save("hdfs://localhost:8020/data/temp_insight”)
> Thread.sleep(15000)
> println(fs.exists(new Path("/data/temp_insight")))
> Output:
> 
> false
> while I have stopped the thread for 15 sec, I have checked my hdfs using 
> command
> 
> hdfs dfs -ls /data/temp_insight
> Output:
> 
> 18/06/08 17:48:18 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> -rw-r--r--   3 abhijeet supergroup  0 2018-06-08 17:48 
> /data/temp_insight/_SUCCESS
> -rw-r--r--   3 abhijeet supergroup201 2018-06-08 17:48 
> /data/temp_insight/part-0-7bffb826-f18d-4022-b089-da85565525b7-c000.csv
> To cross verify whether it is taking the path of hdfs or not I have added one 
> more println statement in my code, providing the path which is already there 
> in HDFS. It's showing true in that case.
> 
> So, what could be the reason?
> 
> Thanks,
> 
> Abhijeet Kumar


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jörn Franke
Additionally I meant with modularization that jobs that have really nothing to 
do with each other should be in separate python programs

> On 5. Jun 2018, at 04:50, Thakrar, Jayesh  
> wrote:
> 
> Disclaimer - I use Spark with Scala and not Python.
>  
> But I am guessing that Jorn's reference to modularization is to ensure that 
> you do the processing inside methods/functions and call those methods 
> sequentially.
> I believe that as long as an RDD/dataset variable is in scope, its memory may 
> not be getting released.
> By having functions, they will get out of scope and their memory can be 
> released.
>  
> Also, assuming that the variables are not daisy-chained/inter-related as that 
> too will not make it easy.
>  
>  
> From: Jay 
> Date: Monday, June 4, 2018 at 9:41 PM
> To: Shuporno Choudhury 
> Cc: "Jörn Franke [via Apache Spark User List]" 
> , 
> Subject: Re: [PySpark] Releasing memory after a spark job is finished
>  
> Can you tell us what version of Spark you are using and if Dynamic Allocation 
> is enabled ? 
>  
> Also, how are the files being read ? Is it a single read of all files using a 
> file matching regex or are you running different threads in the same pyspark 
> job?
>  
>  
> 
> On Mon 4 Jun, 2018, 1:27 PM Shuporno Choudhury, 
>  wrote:
> Thanks a lot for the insight.
> Actually I have the exact same transformations for all the datasets, hence 
> only 1 python code.
> Now, do you suggest that I run different spark-submit for all the different 
> datasets given that I have the exact same transformations?
>  
> On Tue 5 Jun, 2018, 1:48 AM Jörn Franke [via Apache Spark User List], 
>  wrote:
> Yes if they are independent with different transformations then I would 
> create a separate python program. Especially for big data processing 
> frameworks one should avoid to put everything in one big monotholic 
> applications.
>  
> 
> On 4. Jun 2018, at 22:02, Shuporno Choudhury <[hidden email]> wrote:
> 
> Hi,
>  
> Thanks for the input.
> I was trying to get the functionality first, hence I was using local mode. I 
> will be running on a cluster definitely but later.
>  
> Sorry for my naivety, but can you please elaborate on the modularity concept 
> that you mentioned and how it will affect whatever I am already doing?
> Do you mean running a different spark-submit for each different dataset when 
> you say 'an independent python program for each process '?
>  
> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] 
> <[hidden email]> wrote:
> Why don’t you modularize your code and write for each process an independent 
> python program that is submitted via Spark?
>  
> Not sure though if Spark local make sense. If you don’t have a cluster then a 
> normal python program can be much better.
> 
> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]> wrote:
> 
> Hi everyone,
> I am trying to run a pyspark code on some data sets sequentially [basically 
> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3. Write 
> modified data in parquet format to a target location]
> Now, while running this pyspark code across multiple independent data sets 
> sequentially, the memory usage from the previous data set doesn't seem to get 
> released/cleared and hence spark's memory consumption (JVM memory consumption 
> from Task Manager) keeps on increasing till it fails at some data set.
> So, is there a way to clear/remove dataframes that I know are not going to be 
> used later? 
> Basically, can I clear out some memory programmatically (in the pyspark code) 
> when processing for a particular data set ends?
> At no point, I am caching any dataframe (so unpersist() is also not a 
> solution).
>  
> I am running spark using local[*] as master. There is a single SparkSession 
> that is doing all the processing.
> If it is not possible to clear out memory, what can be a better approach for 
> this problem?
>  
> Can someone please help me with this and tell me if I am going wrong anywhere?
>  
> --Thanks,
> Shuporno Choudhury
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
> To start a new topic under Apache Spark User List, email [hidden email]
> To unsubscribe from Apache Spark User List, click here.
> NAML
> 
>  
> --
> --Thanks,
> Shuporno Choudhury
>  
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32458.html
> To start a new topic under Apache Spark User List, email 
> ml+s1001560n1...@n3.nabble.com 
> To unsubscribe from Apache Spark User List, click here.
> NAML


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jörn Franke
Yes if they are independent with different transformations then I would create 
a separate python program. Especially for big data processing frameworks one 
should avoid to put everything in one big monotholic applications.


> On 4. Jun 2018, at 22:02, Shuporno Choudhury  
> wrote:
> 
> Hi,
> 
> Thanks for the input.
> I was trying to get the functionality first, hence I was using local mode. I 
> will be running on a cluster definitely but later.
> 
> Sorry for my naivety, but can you please elaborate on the modularity concept 
> that you mentioned and how it will affect whatever I am already doing?
> Do you mean running a different spark-submit for each different dataset when 
> you say 'an independent python program for each process '?
> 
>> On Tue, 5 Jun 2018 at 01:12, Jörn Franke [via Apache Spark User List] 
>>  wrote:
>> Why don’t you modularize your code and write for each process an independent 
>> python program that is submitted via Spark?
>> 
>> Not sure though if Spark local make sense. If you don’t have a cluster then 
>> a normal python program can be much better.
>> 
>>> On 4. Jun 2018, at 21:37, Shuporno Choudhury <[hidden email]> wrote:
>>> 
>>> Hi everyone,
>>> I am trying to run a pyspark code on some data sets sequentially [basically 
>>> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3. 
>>> Write modified data in parquet format to a target location]
>>> Now, while running this pyspark code across multiple independent data sets 
>>> sequentially, the memory usage from the previous data set doesn't seem to 
>>> get released/cleared and hence spark's memory consumption (JVM memory 
>>> consumption from Task Manager) keeps on increasing till it fails at some 
>>> data set.
>>> So, is there a way to clear/remove dataframes that I know are not going to 
>>> be used later? 
>>> Basically, can I clear out some memory programmatically (in the pyspark 
>>> code) when processing for a particular data set ends?
>>> At no point, I am caching any dataframe (so unpersist() is also not a 
>>> solution).
>>> 
>>> I am running spark using local[*] as master. There is a single SparkSession 
>>> that is doing all the processing.
>>> If it is not possible to clear out memory, what can be a better approach 
>>> for this problem?
>>> 
>>> Can someone please help me with this and tell me if I am going wrong 
>>> anywhere?
>>> 
>>> --Thanks,
>>> Shuporno Choudhury
>> 
>> 
>> If you reply to this email, your message will be added to the discussion 
>> below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Releasing-memory-after-a-spark-job-is-finished-tp32454p32455.html
>> To start a new topic under Apache Spark User List, email 
>> ml+s1001560n1...@n3.nabble.com 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
> 
> 
> -- 
> --Thanks,
> Shuporno Choudhury


Re: [PySpark] Releasing memory after a spark job is finished

2018-06-04 Thread Jörn Franke
Why don’t you modularize your code and write for each process an independent 
python program that is submitted via Spark?

Not sure though if Spark local make sense. If you don’t have a cluster then a 
normal python program can be much better.

> On 4. Jun 2018, at 21:37, Shuporno Choudhury  
> wrote:
> 
> Hi everyone,
> I am trying to run a pyspark code on some data sets sequentially [basically 
> 1. Read data into a dataframe 2.Perform some join/filter/aggregation 3. Write 
> modified data in parquet format to a target location]
> Now, while running this pyspark code across multiple independent data sets 
> sequentially, the memory usage from the previous data set doesn't seem to get 
> released/cleared and hence spark's memory consumption (JVM memory consumption 
> from Task Manager) keeps on increasing till it fails at some data set.
> So, is there a way to clear/remove dataframes that I know are not going to be 
> used later? 
> Basically, can I clear out some memory programmatically (in the pyspark code) 
> when processing for a particular data set ends?
> At no point, I am caching any dataframe (so unpersist() is also not a 
> solution).
> 
> I am running spark using local[*] as master. There is a single SparkSession 
> that is doing all the processing.
> If it is not possible to clear out memory, what can be a better approach for 
> this problem?
> 
> Can someone please help me with this and tell me if I am going wrong anywhere?
> 
> --Thanks,
> Shuporno Choudhury


Re: [External] Re: Sorting in Spark on multiple partitions

2018-06-04 Thread Jörn Franke
I think also there is a misunderstanding how repartition works. It keeps the 
existing number of partitions, but hash partitions according to userid. Means 
in each partition it is likely to have different user ids.
 
That would also explain your observed behavior. However without having the full 
source code these are just assumptions.

> On 4. Jun 2018, at 17:33, Jain, Neha T.  wrote:
> 
> Hi Jorn,
>  
> I tried removing userid from my sort clause but still the same issue- data 
> not sorted.
>  
> var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
>  
> I am checking the sorting results  by temporary writing this file to Hive as 
> well as HDFS. Now, when I see the user wise data it is not sorted.
> Attaching the output file for your reference.
>  
> On the basis of sorting within userid partitions, I want to add a flag which 
> marks first item in the partition as true other items in that partition as 
> false.
> If my sorting order is disturbed, the flag is wrongly set.
>  
> Please suggest what else could be done to fix this very basic scenario of 
> sorting in Spark across multiple partitions across multiple nodes.
>  
> Thanks & Regards,
> Neha Jain
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, June 4, 2018 10:48 AM
> To: Sing, Jasbir 
> Cc: user@spark.apache.org; Patel, Payal ; Jain, 
> Neha T. 
> Subject: [External] Re: Sorting in Spark on multiple partitions
>  
> You partition by userid, why do you then sort again by userid in the 
> partition? Can you try to remove userid from the sort? 
>  
> How do you check if the sort is correct or not?
>  
> What is the underlying objective of the sort? Do you have more information on 
> schema and data?
> 
> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> a.  var query = "select * from data distribute by " + userid + " sort by 
> " + userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
>  
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com
> 


Re: [External] Re: Sorting in Spark on multiple partitions

2018-06-04 Thread Jörn Franke
How do you load the data? How do you write it?
I fear without a full source code it will be difficult to troubleshoot the 
issue.

Which Spark version?

Use case is not yet 100% clear to me. You want to set the row with the 
oldest/newest date to true? I would just use top or something similar when 
processing the data.


> On 4. Jun 2018, at 17:33, Jain, Neha T.  wrote:
> 
> Hi Jorn,
>  
> I tried removing userid from my sort clause but still the same issue- data 
> not sorted.
>  
> var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
>  
> I am checking the sorting results  by temporary writing this file to Hive as 
> well as HDFS. Now, when I see the user wise data it is not sorted.
> Attaching the output file for your reference.
>  
> On the basis of sorting within userid partitions, I want to add a flag which 
> marks first item in the partition as true other items in that partition as 
> false.
> If my sorting order is disturbed, the flag is wrongly set.
>  
> Please suggest what else could be done to fix this very basic scenario of 
> sorting in Spark across multiple partitions across multiple nodes.
>  
> Thanks & Regards,
> Neha Jain
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, June 4, 2018 10:48 AM
> To: Sing, Jasbir 
> Cc: user@spark.apache.org; Patel, Payal ; Jain, 
> Neha T. 
> Subject: [External] Re: Sorting in Spark on multiple partitions
>  
> You partition by userid, why do you then sort again by userid in the 
> partition? Can you try to remove userid from the sort? 
>  
> How do you check if the sort is correct or not?
>  
> What is the underlying objective of the sort? Do you have more information on 
> schema and data?
> 
> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> a.  var query = "select * from data distribute by " + userid + " sort by 
> " + userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
>  
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com
> 


Re: Sorting in Spark on multiple partitions

2018-06-03 Thread Jörn Franke
You partition by userid, why do you then sort again by userid in the partition? 
Can you try to remove userid from the sort? 

How do you check if the sort is correct or not?

What is the underlying objective of the sort? Do you have more information on 
schema and data?

> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> var query = "select * from data distribute by " + userid + " sort by " + 
> userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
> 
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com


Re: Why Spark JDBC Writing in a sequential order

2018-05-25 Thread Jörn Franke
Can your database receive the writes concurrently ? Ie do you make sure that 
each executor writes into a different partition at database side ?

> On 25. May 2018, at 16:42, Yong Zhang  wrote:
> 
> Spark version 2.2.0
> 
> 
> We are trying to write a DataFrame to remote relationship database (AWS 
> Redshift). Based on the Spark JDBC document, we already repartition our DF as 
> 12 and set the spark jdbc to concurrent writing for 12 partitions as 
> "numPartitions" parameter.
> 
> 
> We run the command as following:
> 
> dataframe.repartition(12).write.mode("overwrite").option("batchsize", 
> 5000).option("numPartitions", 12).jdbc(url=jdbcurl, table="tableName", 
> connectionProperties=connectionProps)
> 
> Here is the Spark UI:
> 
> 
> 
> 
> We found out that the 12 tasks obviously are running in sequential order. 
> They are all in "Running" status in the beginning at the same time, but if we 
> check the "Duration" and "Shuffle Read Size/Records" of them, it is clear 
> that they are run one by one.
> For example, task 8 finished first in about 2 hours, and wrote 34732 records 
> to remote DB (I knew the speed looks terrible, but that's not the question of 
> this post), and task 0 started after task 8, and took 4 hours (first 2 hours 
> waiting for task 8). 
> In this picture, only task 2 and 4 are in running stage, but task 4 is 
> obviously waiting for task 2 to finish, then start writing after that.
> 
> My question is, in the above Spark command, my understanding that 12 
> executors should open the JDBC connection to the remote DB concurrently, and 
> all 12 tasks should start writing also in concurrent, and whole job should 
> finish around 2 hours overall.
> 
> Why 12 tasks indeed are in "RUNNING" stage, but looks like waiting for 
> something, and can ONLY write to remote DB sequentially? The 12 executors are 
> on different JVMs on different physical nodes. Why this is happening? What 
> stops Spark pushing the data truly concurrent?
> 
> Thanks
> 
> Yong 
> 


Re: Time series data

2018-05-24 Thread Jörn Franke
There is not one answer to this. 

It really depends what kind of time Series analysis you do with the data and 
what time series database you are using. Then it also depends what Etl you need 
to do.
You seem to also need to join data - is it with existing data of the same type 
or do you join completely different data. If so where does this data come from?

360 GB / day / uncompressed does not sound terrible much.

> On 24. May 2018, at 08:49, amin mohebbi  wrote:
> 
> Could you please help me to understand  the performance that we get from 
> using spark with any nosql or TSDB ? We receive 1 mil meters x 288 readings = 
> 288 mil rows (Approx. 360 GB per day) – Therefore, we will end up with 10's 
> or 100's of TBs of data and I feel that NoSQL will be much quicker than 
> Hadoop/Spark. This is time series data that are coming from many devices in 
> form of flat files and it is currently extracted / transformed /loaded  into 
> another database which is connected to BI tools. We might use azure data 
> factory to collect the flat files and then use spark to do the ETL(not sure 
> if it is correct way) and then use spark to join table or do the aggregations 
> and save them into a db (preferably nosql not sure). Finally, connect deploy 
> Power BI to get visualize the data from nosql db. My questions are :
> 
> 1- Is the above mentioned correct architecture? using spark with nosql as I 
> think combination of these two could help to have random access and run many 
> queries by different users. 
> 2- do we really need to use a time series db? 
> 
> 
> Best Regards ... Amin 
> Mohebbi PhD candidate in Software Engineering   at university of Malaysia   
> Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my   
> amin_...@me.com


Re:

2018-05-16 Thread Jörn Franke
How many rows do you have in total?

> On 16. May 2018, at 11:36, Davide Brambilla  
> wrote:
> 
> Hi all,
>we have a dataframe with 1000 partitions and we need to write the 
> dataframe into a MySQL using this command:
> 
> df.coalesce(20)
> df.write.jdbc(url=url,
>   table=table,
>   mode=mode,
>   properties=properties)
> 
> and we get this errors randomly
> 
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply(DiskStore.scala:125)
>   at 
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.apply(DiskStore.scala:124)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>   at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
>   at 
> org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:520)
>   at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
>   at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:753)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
>   at scala.Option.foreach(Option.scala:257)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
>   at 

Re: [Java] impact of java 10 on spark dev

2018-05-16 Thread Jörn Franke
First thing would be that scala supports them. Then for other things someone 
might need to redesign the Spark source code to leverage modules - this could 
be a rather handy feature to have a small but very well designed core (core, 
ml, graph etc) around which others write useful modules.

> On 16. May 2018, at 08:20, xmehaut  wrote:
> 
> Hello,
> 
> i would like to know what coudl be the impacts of java 10+ on spark. I know
> that spark is written in scala, but the last versions of java include many
> improvements, especially in the jvm or in the delivery process (modules,
> jit, memory mngt, ...) which could benefit to spark.
> 
> regards
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark sql StackOverflow

2018-05-15 Thread Jörn Franke
3000 filters don’t look like something reasonable. This is very difficult to 
test and verify as well as impossible to maintain.
Could it be that your filters are another table that you should join with ?
The example is a little bit artificial to understand the underlying business 
case. Can you provide a more realistic example?

Maybe a bloom filter or something similar can make sense for you ?  Basically 
you want  to know if the key pair is in a given set of pairs?

> On 15. May 2018, at 11:48, Alessandro Solimando 
>  wrote:
> 
> From the information you provided I would tackle this as a batch problem, 
> because this way you have access to more sophisticated techniques and you 
> have more flexibility (maybe HDFS and a SparkJob, but also think about a 
> datastore offering good indexes for the kind of data types and values you 
> have for your keys, and benefit from filter push-downs).
> 
> I personally use streaming only when real-time ingestion is needed.
> 
> Hth,
> Alessandro
> 
>> On 15 May 2018 at 09:11, onmstester onmstester  wrote:
>> 
>> How many distinct key1 (resp. key2) values do you have? Are these values 
>> reasonably stable over time?
>> 
>> less than 10 thousands and this filters would change each 2-3 days. They 
>> would be written and loaded from a database
>> 
>> Are these records ingested in real-time or are they loaded from a datastore?
>> 
>> records would be loaded from some text files that would be copied in some 
>> directory over and over
>> 
>> Are you suggesting that i dont need to use spark-streaming?
>> Sent using Zoho Mail
>> 
>> 
>> 
>>  On Tue, 15 May 2018 11:26:42 +0430 Alessandro Solimando 
>>  wrote 
>> 
>> Hi,
>> I am not familiar with ATNConfigSet, but some thoughts that might help.
>> 
>> How many distinct key1 (resp. key2) values do you have? Are these values 
>> reasonably stable over time?
>> 
>> Are these records ingested in real-time or are they loaded from a datastore?
>> 
>> If the latter case the DB might be able to efficiently perform the 
>> filtering, especially if equipped with a proper index over key1/key2 (or a 
>> composite one).
>> 
>> In such case the filter push-down could be very effective (I didn't get if 
>> you just need to count or do something more with the matching record).
>> 
>> Alternatively, you could try to group by (key1,key2), and then filter (it 
>> again depends on the kind of output you have in mind).
>> 
>> If the datastore/stream is distributed and supports partitioning, you could 
>> partition your records by either key1 or key2 (or key1+key2), so they are 
>> already "separated" and can be consumed more efficiently (e.g., the groupby 
>> could then be local to a single partition).
>> 
>> Best regards,
>> Alessandro
>> 
>> On 15 May 2018 at 08:32, onmstester onmstester  wrote:
>> 
>> 
>> Hi, 
>> 
>> I need to run some queries on huge amount input records. Input rate for 
>> records are 100K/seconds.
>> A record is like (key1,key2,value) and the application should report 
>> occurances of kye1 = something && key2 == somethingElse.
>> The problem is there are too many filters in my query: more than 3 thousands 
>> pair of key1 and key2 should be filtered.
>> I was simply puting 1 millions of records in a temptable each time and 
>> running a query sql using spark-sql on temp table:
>> select * from mytemptable where (kye1 = something && key2 == somethingElse) 
>> or (kye1 = someOtherthing && key2 == someAnotherThing) or ...(3thousands 
>> or!!!)
>> And i encounter StackOverFlow at ATNConfigSet.java line 178.
>> 
>> So i have two options IMHO:
>> 1. Either put all key1 and key2 filter pairs in another temp table and do a 
>> join between  two temp table
>> 2. Or use spark-stream that i'm not familiar with and i don't know if it 
>> could handle 3K of filters.
>> 
>> Which way do you suggest? what is the best solution for my problem 
>> 'performance-wise'?
>> 
>> Thanks in advance
>> 
>> 
> 


Re: Measure performance time in some spark transformations.

2018-05-13 Thread Jörn Franke
Can’t you find this in the Spark UI or timeline server?

> On 13. May 2018, at 00:31, Guillermo Ortiz Fernández 
>  wrote:
> 
> I want to measure how long it takes some different transformations in Spark 
> as map, joinWithCassandraTable and so on.  Which one is the best aproximation 
> to do it? 
> 
> def time[R](block: => R): R = {
> val t0 = System.nanoTime()
> val result = block   
> val t1 = System.nanoTime()
> println("Elapsed time: " + (t1 - t0) + "ns")
> result
> }
> 
> Could I use something like this?? I guess that the System.nanoTime will be 
> executed in the driver before and after the workers execute the maps/joins 
> and so on. Is it right? any other idea?


Re: ordered ingestion not guaranteed

2018-05-11 Thread Jörn Franke
What DB do you have? 

You have some options, such as
1) use a key value store (they can be accessed very efficiently) to see if 
there has been a newer key already processed - if yes then ignore value if no 
then insert into database
2) redesign the key to include the timestamp and find out the latest one when 
querying the database 

> On 11. May 2018, at 23:25, ravidspark  wrote:
> 
> Hi All,
> 
> I am using Spark 2.2.0 & I have below use case:
> 
> *Reading from Kafka using Spark Streaming and updating(not just inserting)
> the records into downstream database*
> 
> I understand that the way Spark read messages from Kafka will not be in a
> order of timestamp as stored in Kafka partitions rather, in the order of
> offsets of the partitions. So, for suppose if there are two messages in
> kafka with the same key but one message with timestamp which is latest and
> is placed in the smallest offset, one more message with oldest timestamp
> placed in at earliest offset. In this case, as Spark reads from smallest ->
> earliest offset, the latest timestamp will be processed first and then
> oldest timestamp resulting in an unordered ingestion into the DB.
> 
> If both these messages fell into the same rdd, then applying a reduce
> function we can ignore the message with oldest timestamp and process the
> latest timestamp message. But, I am not quite sure how to handle if these
> messages fall into different RDD's in the stream. An approach I was trying
> is to hit the DB and retrieve the timestamp in DB for that key and compare
> and ignore if old timestamp. But, this is not an efficient way when handling
> millions of messages as DB handling is expensive.
> 
> Is there a better way of solving this problem?
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: A naive ML question

2018-04-29 Thread Jörn Franke
The transactions probably describe from which counterparty assets are 
transferred to another counterparty at the different stages of the transaction. 
You could use graphx for that if the algorithms there are suitable for your 
needs.
Still trying to understand what you mean evolve over time? Eg a counterparty 
has cancelled a lot of transactions or sth like this? Normally it looks like 
you have a rather straight forward state machine for your transactions. 

> On 29. Apr 2018, at 12:18, kant kodali <kanth...@gmail.com> wrote:
> 
> Hi Nick,
> 
> Thanks for that idea!! Just to be more clear. The problem I am trying to 
> solve is that when a bunch of financial transactional data is thrown at me I 
> am trying to identify all possible relationships and lineage among them 
> without explicitly specifying what the relationships are among transactions.
> 
>> On Sun, Apr 29, 2018 at 2:22 AM, Nick Pentreath <nick.pentre...@gmail.com> 
>> wrote:
>> One potential approach could be to construct a transition matrix showing the 
>> probability of moving from each state to another state. This can be 
>> visualized with a “heat map” encoding (I think matshow in numpy/matplotlib 
>> does this).
>> 
>>> On Sat, 28 Apr 2018 at 21:34, kant kodali <kanth...@gmail.com> wrote:
>>> Hi,
>>> 
>>> I mean a transaction goes typically goes through different states like 
>>> STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc...
>>> 
>>> Thanks,
>>> kant
>>> 
>>>> On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke <jornfra...@gmail.com> wrote:
>>>> What do you mean by “how it evolved over time” ? A transaction describes 
>>>> basically an action at a certain point of time. Do you mean how a 
>>>> financial product evolved over time given a set of a transactions?
>>>> 
>>>> > On 28. Apr 2018, at 12:46, kant kodali <kanth...@gmail.com> wrote:
>>>> > 
>>>> > Hi All,
>>>> > 
>>>> > I have a bunch of financial transactional data and I was wondering if 
>>>> > there is any ML model that can give me a graph structure for this data? 
>>>> > other words, show how a transaction had evolved over time? 
>>>> > 
>>>> > Any suggestions or references would help.
>>>> > 
>>>> > Thanks!
>>>> > 
>>> 
> 


Re: Do GraphFrames support streaming?

2018-04-29 Thread Jörn Franke
What is the use case you are trying to solve?
You want to load graph data from a streaming window in separate graphs - 
possible but requires probably a lot of memory. 
You want to update an existing graph with new streaming data and then fully 
rerun an algorithms -> look at Janusgraph
You want to update incrementally an existing graph and run incrementally a 
graph algorithm suitable for this - you have to implement yourself as far as I 
am aware

> On 29. Apr 2018, at 11:43, kant kodali  wrote:
> 
> Do GraphFrames support streaming?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  1   2   3   4   5   6   >