Re: Data Duplication Bug Found - Structured Streaming Versions 3..4.1, 3.2.4, and 3.3.2

2023-09-14 Thread russell . spitzer
Exactly once should be output sink dependent, what sink was being used?Sent from my iPhoneOn Sep 14, 2023, at 4:52 PM, Jerry Peng  wrote:Craig,Thanks! Please let us know the result!Best,JerryOn Thu, Sep 14, 2023 at 12:22 PM Mich Talebzadeh  wrote:Hi Craig,Can you please clarify what this bug is and provide sample code causing this issue?HTH 

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom

   view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Thu, 14 Sept 2023 at 17:48, Craig Alfieri  wrote:







Hello Spark Community-
 
As part of a research effort, our team here at Antithesis tests for correctness/fault tolerance of major OSS projects.
Our team recently was testing Spark’s Structured Streaming, and we came across a data duplication bug we’d like to work with the teams on to resolve.
 
Our intention is to utilize this as a future case study for our platform, but prior to doing so we like to have a resolution in place so that an announcement isn’t alarming to the user base.
 
Attached is a high level .pdf that reviews the High Availability set-up put under test.
This was also tested across the three latest versions, and the same behavior was observed.
 
We can reproduce this error readily, since our environment is fully deterministic, we are just not Spark experts and would like to work with someone in the community to resolve this.
 
Please let us know at your earliest convenience.
 
Best


 







Craig Alfieri



c: 917.841.1652

craig.alfi...@antithesis.com



New York, NY.

Antithesis.com




 
We can't talk about most of the bugs that we've found for our customers,

but some customers like to speak about their work with us:
https://github.com/mongodb/mongo/wiki/Testing-MongoDB-with-Antithesis


 
 





-This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity for whom they are addressed. If you received this message in error, please notify the sender and remove it from your system.
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark Doubts

2022-06-25 Thread russell . spitzer
Code is always distributed for any operations on a DataFrame or RDD. The size 
of your code is irrelevant except to Jvm memory limits. For most jobs the 
entire application jar and all dependencies are put on the classpath of every 
executor. 

There are some exceptions but generally you should think about all data 
processing occurring on executor Jvms.

Sent from my iPhone

> On Jun 25, 2022, at 2:18 AM, Sid  wrote:
> 
> 
> Hi Tufan,
> 
> Thanks for the answers. However, by the second point, I mean to say where 
> would my code reside? Will it be copied to all the executors since the code 
> size would be small or will it be maintained on the driver's side? I know 
> that driver converts the code to DAG and when an action is called it is 
> submitted to the DAG scheduler and so on...
> 
> Thanks,
> Sid
> 
>> On Sat, Jun 25, 2022 at 12:34 PM Tufan Rakshit  wrote:
>> Please find the answers inline please .
>> 1) Can I apply predicate pushdown filters if I have data stored in S3 or it 
>> should be used only while reading from DBs?
>> it can be applied in s3 if you store parquet , csv, json or in avro format 
>> .It does not depend on the DB , its supported in object store like s3 as 
>> well .
>> 
>> 2) While running the data in distributed form, is my code copied to each and 
>> every executor. As per me, it should be the case since code.zip would be 
>> smaller in size to be copied on each worker node.
>> if  you are trying to join two datasets out of which one is small , Spark by 
>> default would try to broadcast the smaller data set to the other executor , 
>> rather going for a Sort merge Join , There is property which is enabled by 
>> default from spark 3.1 , the limit for smaller dataframe to be broadcasted 
>> is 10 MB , it can also be changed  to higher value with config .
>> 
>> 3) Also my understanding of shuffling of data is " It is moving one 
>> partition to another partition or moving data(keys) of one partition to 
>> another partition of those keys. It increases memory since before shuffling 
>> it copies the data in the memory and then transfers to another partition". 
>> Is it correct? If not, please correct me.
>> 
>> It depends on the context of Distributed computing as Your data does not sit 
>> in one machine , neither in one Disk . Shuffle is involved when you try to 
>> trigger actions like Group by or Sort as it involves bringing all the keys 
>> into one executor Do the computation , or when Sort merge Join is triggered 
>> then both the dataset Sorted and this sort is Global sort not partition wise 
>> sort . yes its memory intensive operation as , if you see a lot of shuffle 
>> to be involved best to use SSD (M5d based machine in AWS ) .
>> As for really big jobs where TB worth of data has to be joined its not 
>> possible to do all the operation in memory in RAM 
>> 
>> 
>> Hope that helps .
>> 
>> Best 
>> Tufan
>> 
>> 
>> 
>>> On Sat, 25 Jun 2022 at 08:43, Sid  wrote:
>>> Hi Team,
>>> 
>>> I have various doubts as below:
>>> 
>>> 1) Can I apply predicate pushdown filters if I have data stored in S3 or it 
>>> should be used only while reading from DBs?
>>> 
>>> 2) While running the data in distributed form, is my code copied to each 
>>> and every executor. As per me, it should be the case since code.zip would 
>>> be smaller in size to be copied on each worker node.
>>> 
>>> 3) Also my understanding of shuffling of data is " It is moving one 
>>> partition to another partition or moving data(keys) of one partition to 
>>> another partition of those keys. It increases memory since before shuffling 
>>> it copies the data in the memory and then transfers to another partition". 
>>> Is it correct? If not, please correct me.
>>> 
>>> Please help me to understand these things in layman's terms if my 
>>> assumptions are not correct.
>>> 
>>> Thanks,
>>> Sid


Re: Why is spark running multiple stages with the same code line?

2022-04-21 Thread Russell Spitzer
There are a few things going on here.

1. Spark is lazy, so nothing happens until a result is collected back to the 
driver or data is written to a sink. So the 1 line you see 
is most likely just that trigger. Once triggered, all of the work required to 
make that final result happen occurs. If the final collect depends on 100 joins 
for example, you would run one line on the final result and it would trigger 
100 stages.

2. A stage is not a logical boundary, it's a physical boundary between shuffle 
exchanges. In practice this means any time an operation requires a portion from 
all of the data in the previous operation. For example, doing a global sort may 
require data that is in every task from the previous operation to complete 
successfully. A job may have a theoretically unlimited number of stages 
although this does have some technical limits.

> On Apr 21, 2022, at 9:09 AM, Joe  wrote:
> 
> Hi,
> When looking at application UI (in Amazon EMR) I'm seeing one job for
> my particular line of code, for example:
> 64 Running count at MySparkJob.scala:540
> 
> When I click into the job and go to stages I can see over a 100 stages
> running the same line of code (stages are active, pending or
> completed):
> 190 Pending count at MySparkJob.scala:540
> ...
> 162 Active count at MySparkJob.scala:540
> ...
> 108 Completed count at MySparkJob.scala:540
> ...
> 
> I'm not sure what that means, I thought that stage was a logical
> operation boundary and you could have only one stage in the job (unless
> you executed the same dataset+action many times on purpose) and tasks
> were the ones that were replicated across partitions. But here I'm
> seeing many stages running, each with the same line of code?
> 
> I don't have a situation where my code is re-processing the same set of
> data many times, all intermediate sets are persisted.
> I'm not sure if EMR UI display is wrong or if spark stages are not what
> I thought they were?
> Thanks,
> 
> Joe
> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark CORE][Spark SQL][Advanced]: Why dynamic partition pruning optimization does not work in this scenario?

2021-12-04 Thread Russell Spitzer
This is probably because your data size is well under the broadcastJoin 
threshold so at the planning phase it decides to do a BroadcastJoin instead of 
a Join which could take advantage of dynamic partition pruning. For testing 
like this you can always disable that with 
spark.sql.autoBroadcastJoinThreshold=-1

In a real data scenario the size of the join tables would probably be much 
larger than the default (10mb) and trigger a dynamic partition pruning although 
I can see it may be beneficial to implement dynamic partition pruning for 
broadcast joins as well...


> On Dec 4, 2021, at 8:41 AM, Mohamadreza Rostami 
>  wrote:
> 
> Hello all,
> 
> We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet 
> format. To speed-up our querys, we trying diffrent scenarios. We find out 
> that Spark support dynamic partition pruning in versions after 3.0.0 . So, to 
> test the improvment of DPP feature we defined two tables sales and products 
> and a query. You can find the codes that initialize the envieonment here:
> # First Run
> val salesSeq = Seq((1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), 
> (1, 8), (2, 1), (2, 2), (2, 3), (2, 4), (2, 5), (2, 6), (2, 7), (2, 8))
> val productSeq = Seq((1, "A"), (2, "B"))
> sc.parallelize(salesSeq).toDF("pId","q").write.mode("overwrite").parquet("hdfs://test/spark-optimization/sales.parquet
>  ")
> sc.parallelize(productSeq).toDF("Id","name").write.mode("overwrite").parquet("hdfs://test/spark-optimization/products.parquet
>  ”)
> 
> Then we run an other scala code to run the query. you can find the second run 
> file here:
> # Second Run
> val salesDF = 
> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
> ")
> val productDF = 
> spark.read.parquet("hdfs://test/spark-optimization/sales.parquet 
> ")
> salesDF.createOrReplaceTempView("sales")
> productDF.createOrReplaceTempView("products")
> sql("SELECT * FROM sales JOIN products ON sales.pId = products.id 
>  and products.name = 'A'").explain()
> 
> Based on the DPP feature, we expect filters pushed down to file scan level 
> that prevents reading unnecessary partitions. See the following picture:
> <13551396-1591032620843.png>
> 
> 
> But instead, Spark does not push down filters to the file scan layer and uses 
> broadcast join without filtering partitions. See the following picture:
> <13551394-1591032607773.png>
> 
> 
> To better understand the situation, please have looked at this link. 
> (https://dzone.com/articles/dynamic-partition-pruning-in-spark-30 
> )
> We checked the DPP and adaptive query features are enabled in our spark 
> cluster. So my question is, How can I debug and find the root cause of this 
> problem?
> 
> 
> Cheers,



Re: Possibly a memory leak issue in Spark

2021-09-22 Thread Russell Spitzer
As Sean said I believe you want to be setting

spark.ui.retainedJobs   1000How many jobs the Spark UI and status APIs 
remember before garbage collecting. This is a target maximum, and fewer 
elements may be retained in some circumstances.  1.2.0
spark.ui.retainedStages 1000How many stages the Spark UI and status APIs 
remember before garbage collecting. This is a target maximum, and fewer 
elements may be retained in some circumstances.0.9.0
spark.ui.retainedTasks  10  How many tasks in one stage the Spark UI and 
status APIs remember before garbage collecting. This is a target maximum, and 
fewer elements may be retained in some circumstances.2.0.1

To lower numbers. If i remember correctly this is what controls how much 
metadata remains in the driver post task/stage/job competition. 

> On Sep 22, 2021, at 12:42 PM, Kohki Nishio  wrote:
> 
> I believe I have enough information, raised this
> 
> https://issues.apache.org/jira/browse/SPARK-36827 
> 
> 
> thanks
> -Kohki
> 
> 
> On Tue, Sep 21, 2021 at 9:30 PM Sean Owen  > wrote:
> No, that's just info Spark retains about finished jobs and tasks, likely. You 
> can limit how much is retained if desired with config. 
> 
> On Tue, Sep 21, 2021, 11:29 PM Kohki Nishio  > wrote:
> Just following up, it looks like task / stage / job data are not cleaned up
> --
>6:   7835346 2444627952  org.apache.spark.status.TaskDataWrapper
>  25:   3765152  180727296  org.apache.spark.status.StageDataWrapper
> 88:2322559290200  org.apache.spark.status.JobDataWrapper
> 
> UI is disabled, not sure why we need to have those data ..
> 
> -Kohki 
> 
> 
> On Fri, Sep 17, 2021 at 8:27 AM Kohki Nishio  > wrote:
> Hello,
> I'm seeing possible memory leak behavior in my spark application. According 
> to MAT, it looks like it's related to ElementTrackingStore ..
> 
> 
> 
> The increase is subtle, so it takes multiple days to actually cause some 
> impact, but I'm wondering if anybody has any idea about what this is about 
> ...  Below is the GC graph, yellow is the level after any GC kicks in.
> 
> 
> 
> Thanks
> -- 
> Kohki Nishio
> 
> 
> -- 
> Kohki Nishio
> 
> 
> -- 
> Kohki Nishio



Re: Spark Null Pointer Exception

2021-06-30 Thread Russell Spitzer
Could also be transient object being referenced from within the custom code. 
When serialized the reference shows up as null even though you had set it in 
the parent object.

> On Jun 30, 2021, at 4:44 PM, Sean Owen  wrote:
> 
> The error is in your code, which you don't show. You are almost certainly 
> incorrectly referencing something like a SparkContext in a Spark task.
> 
> On Wed, Jun 30, 2021 at 3:48 PM Amit Sharma  > wrote:
> Hi , I am using spark 2.7 version with scala. I am calling a method as below 
> 
> 1. val rddBacklog = spark.sparkContext.parallelize(MAs) // MA is list of say 
> city
> 2. rddBacklog.foreach(ma => doAlloc3Daily(ma, fteReview.forecastId, 
> startYear, endYear)) 
> 3.doAlloc3Daily method just doing a database call and doing some scala 
> calculation (no rdd or dataframe)
> 
> Line number 2 I  am getting below  nullpointer intermittently on cluster but 
> never on local.
> java.lang.NullPointerException
>   at 
> sparkStreaming.CalculateFteReview.doAlloc3Daily(CalculateFteReview.scala:1307)
>   at 
> sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199)
>   at 
> sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at 
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:109)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> 
> 
> Thanks
> Amit
> 
> 
> 
> 
> 
> 



Re: Request for FP-Growth source code

2021-06-28 Thread Russell Spitzer
Sorry wrong repository,
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala

> On Jun 28, 2021, at 11:21 AM, Eduardus Hardika Sandy Atmaja  
> wrote:
> 
> I am sorry, I can't open the link. "This site can’t be reached".
> Is there any Java/Python code available?
> 
> Best Regards,
> Eduardus Hardika Sandy Atmaja
> From: Russell Spitzer 
> Sent: Monday, June 28, 2021 8:28 PM
> To: Eduardus Hardika Sandy Atmaja 
> Cc: user@spark.apache.org 
> Subject: Re: Request for FP-Growth source code
>  
> https://github.pie.apple.com/IPR/apache-spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
>  
> <https://github.pie.apple.com/IPR/apache-spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala>
>  This?
> 
> On Mon, Jun 28, 2021 at 5:11 AM Eduardus Hardika Sandy Atmaja  <mailto:e...@usd.ac.id>> wrote:
> Dear Apache Spark Admin
> 
> Hello, my name is Edo. I am a Ph.D. student from India. Now I am still 
> learning about High Utility Itemset Mining which is extension of Frequent 
> Itemset Mining for my research. I am interested to implement my algorithm 
> using Apache Spark but I do not have any idea how to do it. I tried to run 
> FP-Growth algorithm in Java Netbeans downloaded from Apache Spark website but 
> I cannot find the source code. Would you mind to share the code with me? I 
> need it to learn how to implement my algorithm to Spark environment. Thank 
> you very much. I hope you will grant my request.
> 
> Best Regards,
> Eduardus Hardika Sandy Atmaja



Re: Request for FP-Growth source code

2021-06-28 Thread Russell Spitzer
https://github.pie.apple.com/IPR/apache-spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
This?

On Mon, Jun 28, 2021 at 5:11 AM Eduardus Hardika Sandy Atmaja 
wrote:

> Dear Apache Spark Admin
>
> Hello, my name is Edo. I am a Ph.D. student from India. Now I am still
> learning about High Utility Itemset Mining which is extension of Frequent
> Itemset Mining for my research. I am interested to implement my algorithm
> using Apache Spark but I do not have any idea how to do it. I tried to run
> FP-Growth algorithm in Java Netbeans downloaded from Apache Spark website
> but I cannot find the source code. Would you mind to share the code with
> me? I need it to learn how to implement my algorithm to Spark environment.
> Thank you very much. I hope you will grant my request.
>
> Best Regards,
> Eduardus Hardika Sandy Atmaja
>


Re: Spark Structured Streaming 'bool' object is not callable, quitting

2021-04-21 Thread Russell Spitzer
Callable means you tried to treat a field as a function like in the
following example

>>> fun = True
>>> fun()
Traceback (most recent call last):
  File "", line 1, in 
TypeError: 'bool' object is not callable

My guess is that "isStreaming" is a bool, and in your syntax you used it as
a function "isStreaming()"

On Wed, Apr 21, 2021 at 5:35 PM Mich Talebzadeh 
wrote:

> Hi,
>
> I am testing something in Spark Structured Streaming, this is a new topic
>
>  Typical kafka json row
>
>   dummy2 = StructType().add("uuid", StringType()).add("timecreated",
> TimestampType()).add("status", StringType())
>
> # example
> ##856095c6-cdec-485b-9da6-d78275bc0a25
> {"uuid":"856095c6-cdec-485b-9da6-d78275bc0a25",
> "timecreated":"2021-04-21T23:29:16", "status":true"}
>
>   Created the Streaming read
>
>
>  streamingDummy = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['dummy']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topicdummy']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> dummy2).alias("dummy_value"))
>
> streamingDummy.printSchema()
>   *print(streamingDummy.isStreaming())*
>
> When I run it I get
>
>
> root
>
>  |-- dummy_value: struct (nullable = true)
>
>  ||-- uuid: string (nullable = true)
>
>  ||-- timecreated: timestamp (nullable = true)
>
>  ||-- status: string (nullable = true)
>
>
> *'bool' object is not callable, quitting*
>
> I cannot figure out the last line!
>
>
> Thanks
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: [Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop

2021-04-13 Thread Russell Spitzer
scala> def getRootRdd( rdd:RDD[_] ): RDD[_]  = { if (rdd.dependencies.size == 
0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]

scala> val rdd = spark.read.parquet("/Users/russellspitzer/Temp/local").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] 
at rdd at :24

scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at :24

scala> scan.partitions.map(scan.preferredLocations)
res8: Array[Seq[String]] = Array(WrappedArray(), WrappedArray(), 
WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), 
WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray())

I define a quick traversal to get the source RDD for the dataframe operation. I 
make the read datafrarne and get the RDD out of it. I traverse the RDD's 
dependencies to get the FileScan. I then apply the scan's preferredLocations 
methods to each partition. You can see the result here is that none of my 
partitions have a preferred location so they will all be run at "Any". This is 
because I'm using my local file system which never reports a preferred location 
so even though the scheduler will report "ANY" in this case they are actually 
node local.


> On Apr 13, 2021, at 8:37 AM, Mohamadreza Rostami 
>  wrote:
> 
> Thanks for your response.
> I think my HDFS-spark cluster is co-localized because I have a spark worker 
> per each datanode; in other words, I installed the spark workers on 
> datanodes, and I think that's the point that why this simple query on a 
> co-localized HDFS-spark cluster run in "Any" locality level?
> Is there any way to figure out which IP or hostname of data-nodes returns 
> from name-node to the spark? or Can you offer me a debug approach?
> 
>> On Farvardin 24, 1400 AP, at 17:45, Russell Spitzer 
>> mailto:russell.spit...@gmail.com>> wrote:
>> 
>> Data locality can only occur if the Spark Executor IP address string matches 
>> the preferred location returned by the file system. So this job would only 
>> have local tasks if the datanode replicas for the files in question had the 
>> same ip address as the Spark executors you are using. If they don't then the 
>> scheduler falls back to assigning read tasks to the first executor available 
>> with locality level "any". 
>> 
>> So unless you have that HDFS - Spark Cluster co-localization I wouldn't 
>> expect this job to run at any other locality level than ANY.
>> 
>>> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami 
>>> mailto:mohamadrezarosta...@gmail.com>> 
>>> wrote:
>>> 
>>> I have a Hadoop cluster that uses Apache Spark to query parquet files saved 
>>> on Hadoop. For example, when i'm using the following PySpark code to find a 
>>> word in parquet files:
>>> df = spark.read.parquet("hdfs://test/parquets/* ")
>>> df.filter(df['word'] == "jhon").show()
>>> After running this code, I go to spark application UI, stages tab, I see 
>>> that locality level summery set on Any. In contrast, because of this 
>>> query's nature, it must run locally and on NODE_LOCAL locality level at 
>>> least. When I check the network IO of the cluster while running this, I 
>>> find out that this query use network (network IO increases while the query 
>>> is running). The strange part of this situation is that the number shown in 
>>> the spark UI's shuffle section is very small.
>>> How can I find out the root cause of this problem and solve that?
>>> link of stackoverflow.com <http://stackoverflow.com/> : 
>>> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache
>>>  
>>> <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>
> 



Re: [Spark Core][Advanced]: Problem with data locality when running Spark query with local nature on apache Hadoop

2021-04-13 Thread Russell Spitzer
Data locality can only occur if the Spark Executor IP address string matches 
the preferred location returned by the file system. So this job would only have 
local tasks if the datanode replicas for the files in question had the same ip 
address as the Spark executors you are using. If they don't then the scheduler 
falls back to assigning read tasks to the first executor available with 
locality level "any". 

So unless you have that HDFS - Spark Cluster co-localization I wouldn't expect 
this job to run at any other locality level than ANY.

> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami 
>  wrote:
> 
> I have a Hadoop cluster that uses Apache Spark to query parquet files saved 
> on Hadoop. For example, when i'm using the following PySpark code to find a 
> word in parquet files:
> df = spark.read.parquet("hdfs://test/parquets/* ")
> df.filter(df['word'] == "jhon").show()
> After running this code, I go to spark application UI, stages tab, I see that 
> locality level summery set on Any. In contrast, because of this query's 
> nature, it must run locally and on NODE_LOCAL locality level at least. When I 
> check the network IO of the cluster while running this, I find out that this 
> query use network (network IO increases while the query is running). The 
> strange part of this situation is that the number shown in the spark UI's 
> shuffle section is very small.
> How can I find out the root cause of this problem and solve that?
> link of stackoverflow.com  : 
> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache
>  
> 


Re: possible bug

2021-04-08 Thread Russell Spitzer
Could be that the driver JVM cannot handle the metadata required to store
the partition information of a 70k partition RDD. I see you say you have a
100GB driver but i'm not sure where you configured that?

Did you set --driver-memory 100G ?

On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <
markus.wei...@bertelsmann.de> wrote:

> This is the reduction of an error in a complex program where allocated 100
> GB driver (=worker=executor as local mode) memory. In the example I used
> the default size, as the puny example shouldn’t need more anyway.
>
> And without the coalesce or with coalesce(1,True) everything works fine.
>
> I’m trying to coalesce an empty rdd with 7 partitions in an empty rdd
> with 1 partition, why is this a problem without shuffling?
>
>
>
> *Von:* Sean Owen 
> *Gesendet:* Donnerstag, 8. April 2021 15:00
> *An:* Weiand, Markus, NMA-CFD 
> *Cc:* user@spark.apache.org
> *Betreff:* Re: possible bug
>
>
>
> That's a very low level error from the JVM. Any chance you are
> misconfiguring the executor size? like to 10MB instead of 10GB, that kind
> of thing. Trying to think of why the JVM would have very little memory to
> operate.
>
> An app running out of mem would not look like this.
>
>
>
> On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <
> markus.wei...@bertelsmann.de> wrote:
>
> Hi all,
>
>
>
> I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64
> cores and 128 GB RAM). I'm using spark 3.01.
>
>
>
> The following python code leads to an exception, is this a bug or is my
> understanding of the API incorrect?
>
>
>
> import pyspark
>
> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>
> sc=pyspark.SparkContext.getOrCreate(conf)
>
> rows=7
>
> data=list(range(rows))
>
> rdd=sc.parallelize(data,rows)
>
> assert rdd.getNumPartitions()==rows
>
> rdd0=rdd.filter(lambda x:False)
>
> assert rdd0.getNumPartitions()==rows
>
> rdd00=rdd0.coalesce(1)
>
> data=rdd00.collect()
>
> assert data==[]
>
>
>
> output when starting from PyCharm:
>
>
>
> /home/ubuntu/PycharmProjects//venv/bin/python
> /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
> --mode=client --port=41185
>
> import sys; print('Python %s on %s' % (sys.version, sys.platform))
>
> sys.path.extend(['/home/ubuntu/PycharmProjects/'])
>
> PyDev console: starting.
>
> Python 3.8.5 (default, Jan 27 2021, 15:41:15)
>
> [GCC 9.3.0] on linux
>
> import os
>
> os.environ['PYTHONHASHSEED'] = '0'
>
> runfile('/home/ubuntu/PycharmProjects//tests/test.py',
> wdir='/home/ubuntu/PycharmProjects//tests')
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
>
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
>
> WARNING: All illegal access operations will be denied in a future release
>
> 21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> 21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very
> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>
> [Stage 0:>  (0 +
> 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages
> failed (0x7f43d23ff000-0x7f43d2403000).
>
> [423.190s][warning][os,thread] Attempt to deallocate stack guard pages
> failed.
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f43d300b000, 16384, 0) failed; error='Not enough
> space' (errno=12)
>
> [423.231s][warning][os,thread] Failed to start thread - pthread_create
> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>
> #
>
> # There is insufficient memory for the Java Runtime Environment to
> continue.
>
> # Native memory allocation (mmap) failed to map 16384 bytes for committing
> reserved memory.
>
> # An error report file with more information is saved as:
>
> # /home/ubuntu/PycharmProjects//tests/hs_err_pid17755.log
>
> [thread 17966 also had an error]
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f4b7bd81000, 262144, 0) failed; error='Not enough
> space' (errno=12)
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1207, in send_command
>
> raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File 

Re: [Spark SQL, intermediate+] possible bug or weird behavior of insertInto

2021-03-03 Thread Russell Spitzer
Yep this is the behavior for Insert Into, using the other write apis does 
schema matching I believe.

> On Mar 3, 2021, at 8:29 AM, Sean Owen  wrote:
> 
> I don't have any good answer here, but, I seem to recall that this is because 
> of SQL semantics, which follows column ordering not naming when performing 
> operations like this. It may well be as intended.
> 
> On Tue, Mar 2, 2021 at 6:10 AM Oldrich Vlasic  > wrote:
> Hi,
> 
> I have encountered a weird and potentially dangerous behaviour of Spark 
> concerning
> partial overwrites of partitioned data. Not sure if this is a bug or just 
> abstraction
> leak. I have checked Spark section of Stack Overflow and haven't found any 
> relevant
> questions or answers.
> 
> Full minimal working example provided as attachment. Tested on Databricks 
> runtime 7.3 LTS
> ML (Spark 3.0.1). Short summary:
> 
> Write dataframe using partitioning by a column using saveAsTable. Filter out 
> part of the
> dataframe, change some values (simulates new increment of data) and write 
> again,
> overwriting a subset of partitions using insertInto. This operation will 
> either fail on
> schema mismatch or cause data corruption.
> 
> Reason: on the first write, the ordering of the columns is changed (partition 
> column is
> placed at the end). On the second write this is not taken into consideration 
> and Spark
> tries to insert values into the columns based on their order and not on their 
> name. If
> they have different types this will fail. If not, values will be written to 
> incorrect
> columns causing data corruption.
> 
> My question: is this a bug or intended behaviour? Can something be done about 
> it to prevent
> it? This issue can be avoided by doing a select with schema loaded from the 
> target table.
> However, when user is not aware this could cause hard to track down errors in 
> data.
> 
> Best regards,
> Oldřich Vlašic
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 


Re: substitution invocator for a variable in PyCharm sql

2020-12-07 Thread Russell Spitzer
The feature you are looking for is called "String Interpolation" and is
available in python 3.6. It uses a different syntax than scala's
https://www.programiz.com/python-programming/string-interpolation

On Mon, Dec 7, 2020 at 7:05 AM Mich Talebzadeh 
wrote:

> In Spark/Scala you can use 's' substitution invocator for a variable in
> sql call, for example
>
> var sqltext =
>   s"""
> INSERT INTO TABLE ${broadcastStagingConfig.broadcastTable}
> PARTITION (broadcastId = ${broadcastStagingConfig.broadcastValue},brand)
> SELECT
>   ocis_mrg_pty_id AS partyId
> , target_mobile_no AS phoneNumber
> , brand
> FROM ${tag}
>WHERE
>   length(target_mobile_no) =
> ${broadcastStagingConfig.mobileNoLength}
>AND
>   substring(target_mobile_no,1,1) =
> ${broadcastStagingConfig.ukMobileNoStart}
> """
> spark.sql(sqltext)
>
> However, in PySpark the same fails
>
> rows = spark.sql(s"""SELECT COUNT(1) FROM
> ${fullyQualifiedTableName}""").collect()[0][0]
>
>  ^
> SyntaxError: invalid syntax
>
> What is the correct substitute invocation in PyCharm if any?
>
> Thanks,
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark Exception

2020-11-20 Thread Russell Spitzer
The general exceptions here mean that components within the Spark cluster
can't communicate. The most common cause for this is failures of the
processors that are supposed to be communicating. I generally see this when
one of the processes goes into a GC storm or is shut down because of an
exception or something.

On Fri, Nov 20, 2020 at 10:52 AM Amit Sharma  wrote:

> Russell i increased the rpc timeout to 240 seconds but i am still getting
> this issue once a while and after this issue my spark streaming job stuck
> and do not process any request then i need to restart this every time. Any
> suggestion please.
>
>
> Thanks
> Amit
>
> On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma  wrote:
>
>> Hi, we are running a spark streaming  job and sometimes it throws below
>> two exceptions . I am not understanding  what is the difference between
>> these two exception for one timeout is 120 seconds and another is 600
>> seconds. What could be the reason for these
>>
>>
>>  Error running job streaming job 1605709968000 ms.0
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
>> timed out after [120 seconds]. This timeout is controlled by
>> spark.rpc.askTimeout
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>> at org.apache.spark.rpc.RpcTimeout.org
>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
>> at
>> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
>> at
>> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
>> at
>> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
>> at org.apache.spark.storage.BlockManager.org
>> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
>> at org.apache.spark.storage.BlockManager.org
>> $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
>> at
>> org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
>> at
>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)
>>
>>
>>
>>
>>
>> 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
>> heartbeat-receiver-event-loop-thread
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
>> seconds]. This timeout is controlled by BlockManagerHeartbeat
>> at org.apache.spark.rpc.RpcTimeout.org
>> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at
>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
>> at
>> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
>> at
>> org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
>> at
>> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
>> at
>> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
>> at
>> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
>> at
>> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>


Re: Out of memory issue

2020-11-20 Thread Russell Spitzer
Well if the system doesn't change, then the data must be different. The
exact exception probably won't be helpful since it only tells us the last
allocation that failed. My guess is that your ingestion changed and there
is either now slightly more data than previously or it's skewed
differently. One of the two things is probably happening and is overloading
one executor.

The solution is to increase executor heap.

On Fri, Nov 20, 2020 at 8:25 AM Amit Sharma  wrote:

> please help.
>
>
> Thanks
> Amit
>
> On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma  wrote:
>
>> Please find below the exact exception
>>
>> Exception in thread "streaming-job-executor-3"
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
>> at
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at
>> scala.StringContext.standardInterpolator(StringContext.scala:126)
>> at scala.StringContext.s(StringContext.scala:95)
>> at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307)
>> at
>> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154)
>> at
>> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138)
>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
>> at scala.util.Try$.apply(Try.scala:192)
>> at scala.util.Success.map(Try.scala:237)
>>
>> On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:
>>
>>> Hi , I am using 16 nodes spark cluster with below config
>>> 1. Executor memory  8 GB
>>> 2. 5 cores per executor
>>> 3. Driver memory 12 GB.
>>>
>>>
>>> We have streaming job. We do not see problem but sometimes we get
>>> exception executor-1 heap memory issue. I am not understanding if data size
>>> is same and this job receive a request and process it but suddenly it’s
>>> start giving out of memory error . It will throw exception for 1 executor
>>> then throw for other executor also and it stop processing the request.
>>>
>>> Thanks
>>> Amit
>>>
>>


Re: Path of jars added to a Spark Job - spark-submit // // Override jars in spark submit

2020-11-12 Thread Russell Spitzer
--driver-class-path does not move jars, so it is dependent on your Spark
resource manager (master). It is interpreted literally so if your files do
not exist in the location you provide relative where the driver is run,
they will not be placed on the classpath.

Since the driver is responsible for moving jars specified in --jars, you
cannot use a jar specified by --jars to be in driver-class-path, since the
driver is already started and it's classpath is already set before any jars
are moved.

Some distributions may change this behavior though, but this is the jist of
it.

On Thu, Nov 12, 2020 at 10:02 AM Dominique De Vito 
wrote:

> Hi,
>
> I am using Spark 2.1 (BTW) on YARN.
>
> I am trying to upload JAR on YARN cluster, and to use them to replace
> on-site (alreading in-place) JAR.
>
> I am trying to do so through spark-submit.
>
> One helpful answer
> https://stackoverflow.com/questions/37132559/add-jars-to-a-spark-job-spark-submit/37348234
> is the following one:
>
> spark-submit --jars additional1.jar,additional2.jar \
>   --driver-class-path additional1.jar:additional2.jar \
>   --conf spark.executor.extraClassPath=additional1.jar:additional2.jar \
>   --class MyClass main-application.jar
>
> So, I understand the following:
>
>- "--jars" is for uploading jar on each node
>- "--driver-class-path" is for using uploaded jar for the driver.
>- "--conf spark.executor.extraClassPath" is for using uploaded jar for
>executors.
>
> While I master the filepaths for "--jars" within a spark-submit command,
> what will be the filepaths of the uploaded JAR to be used in
> "--driver-class-path" for example ?
>
> The doc says: "*JARs and files are copied to the working directory for
> each SparkContext on the executor nodes*"
>
> Fine, but for the following command, what should I put instead of XXX and
> YYY ?
>
> spark-submit --jars /a/b/some1.jar,/a/b/c/some2.jar \
>   --driver-class-path XXX:YYY \
>   --conf spark.executor.extraClassPath=XXX:YYY \
>   --class MyClass main-application.jar
>
> When using spark-submit, how can I reference the "*working directory for
> the SparkContext*" to form XXX and YYY filepath ?
>
> Thanks.
>
> Dominique
>
> PS: I have tried
>
> spark-submit --jars /a/b/some1.jar,/a/b/c/some2.jar \
>   --driver-class-path some1.jar:some2.jar \
>   --conf spark.executor.extraClassPath=some1.jar:some2.jar  \
>   --class MyClass main-application.jar
>
> No success (if I made no mistake)
>
> And I have tried also:
>
> spark-submit --jars /a/b/some1.jar,/a/b/c/some2.jar \
>--driver-class-path ./some1.jar:./some2.jar \
>--conf spark.executor.extraClassPath=./some1.jar:./some2.jar \
>--class MyClass main-application.jar
>
> No success either.
>


Re: Spark reading from cassandra

2020-11-04 Thread Russell Spitzer
A where clause with a PK restriction should be identified by the Connector
and transformed into a single request. This should still be much slower
than doing the request directly but still much much faster than a full scan.

On Wed, Nov 4, 2020 at 12:51 PM Russell Spitzer 
wrote:

> Yes, the "Allow filtering" part isn't actually important other than for
> letting the query run in the first place. A where clause that utilizes a
> clustering column restriction will perform much better than a full scan,
> column pruning as well can be extremely beneficial.
>
> On Wed, Nov 4, 2020 at 11:12 AM Amit Sharma  wrote:
>
>> Hi, i have a question while we are reading from cassandra should we use
>> partition key only in where clause from performance perspective or it does
>> not matter from spark perspective because it always allows filtering.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Spark reading from cassandra

2020-11-04 Thread Russell Spitzer
Yes, the "Allow filtering" part isn't actually important other than for
letting the query run in the first place. A where clause that utilizes a
clustering column restriction will perform much better than a full scan,
column pruning as well can be extremely beneficial.

On Wed, Nov 4, 2020 at 11:12 AM Amit Sharma  wrote:

> Hi, i have a question while we are reading from cassandra should we use
> partition key only in where clause from performance perspective or it does
> not matter from spark perspective because it always allows filtering.
>
>
> Thanks
> Amit
>


Re: Why spark-submit works with package not with jar

2020-10-20 Thread Russell Spitzer
--jar Adds only that jar
--package adds the Jar and a it's dependencies listed in maven

On Tue, Oct 20, 2020 at 10:50 AM Mich Talebzadeh 
wrote:

> Hi,
>
> I have a scenario that I use in Spark submit as follows:
>
> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar,
> */home/hduser/jars/spark-bigquery_2.11-0.2.6.jar*
>
> As you can see the jar files needed are added.
>
>
> This comes back with error message as below
>
>
> Creating model test.weights_MODEL
>
> java.lang.NoClassDefFoundError:
> com/google/api/client/http/HttpRequestInitializer
>
>   at
> com.samelamin.spark.bigquery.BigQuerySQLContext.bq$lzycompute(BigQuerySQLContext.scala:19)
>
>   at
> com.samelamin.spark.bigquery.BigQuerySQLContext.bq(BigQuerySQLContext.scala:19)
>
>   at
> com.samelamin.spark.bigquery.BigQuerySQLContext.runDMLQuery(BigQuerySQLContext.scala:105)
>
>   ... 76 elided
>
> Caused by: java.lang.ClassNotFoundException:
> com.google.api.client.http.HttpRequestInitializer
>
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
>
> So there is an issue with finding the class, although the jar file used
>
>
> /home/hduser/jars/spark-bigquery_2.11-0.2.6.jar
>
> has it.
>
>
> Now if *I remove the above jar file and replace it with the same version
> but package* it works!
>
>
> spark-submit --driver-class-path /home/hduser/jars/ddhybrid.jar --jars
> /home/hduser/jars/spark-bigquery-latest.jar,/home/hduser/jars/ddhybrid.jar
> *-**-packages com.github.samelamin:spark-bigquery_2.11:0.2.6*
>
>
> I have read the write-ups about packages searching the maven
> libraries etc. Not convinced why using the package should make so much
> difference between a failure and success. In other words, when to use a
> package rather than a jar.
>
>
> Any ideas will be appreciated.
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Scala vs Python for ETL with Spark

2020-10-09 Thread Russell Spitzer
Spark in Scala (or java) Is much more performant if you are using RDD's,
those operations basically force you to pass lambdas, hit serialization
between java and python types and yes hit the Global Interpreter Lock. But,
none of those things apply to Data Frames which will generate Java code
regardless of what language you use to describe the Dataframe operations as
long as you don't use python lambdas. A Dataframe operation without python
lambdas should not require any remote python code execution.

TLDR, If you are using Dataframes it doesn't matter if you use Scala, Java,
Python, R, SQL, the planning and work will all happen in the JVM.

As for a repl, you can run PySpark which will start up a repl. There are
also a slew of notebooks which provide interactive python environments as
well.


On Fri, Oct 9, 2020 at 4:19 PM Mich Talebzadeh 
wrote:

> Thanks
>
> So ignoring Python lambdas is it a matter of individuals familiarity with
> the language that is the most important factor? Also I have noticed that
> Spark document preferences have been switched from Scala to Python as the
> first example. However, some codes for example JDBC calls are the same for
> Scala and Python.
>
> Some examples like this website
> <https://www.kdnuggets.com/2018/05/apache-spark-python-scala.html#:~:text=Scala%20is%20frequently%20over%2010,languages%20are%20faster%20than%20interpreted.>
> claim that Scala performance is an order of magnitude better than Python
> and also when it comes to concurrency Scala is a better choice. Maybe it is
> pretty old (2018)?
>
> Also (and may be my ignorance I have not researched it) does Spark offer
> REPL in the form of spark-shell with Python?
>
>
> Regards,
>
> Mich
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Oct 2020 at 21:59, Russell Spitzer 
> wrote:
>
>> As long as you don't use python lambdas in your Spark job there should be
>> almost no difference between the Scala and Python dataframe code. Once you
>> introduce python lambdas you will hit some significant serialization
>> penalties as well as have to run actual work code in python. As long as no
>> lambdas are used, everything will operate with Catalyst compiled java code
>> so there won't be a big difference between python and scala.
>>
>> On Fri, Oct 9, 2020 at 3:57 PM Mich Talebzadeh 
>> wrote:
>>
>>> I have come across occasions when the teams use Python with Spark for
>>> ETL, for example processing data from S3 buckets into Snowflake with Spark.
>>>
>>> The only reason I think they are choosing Python as opposed to Scala is
>>> because they are more familiar with Python. Since Spark is written in
>>> Scala, itself is an indication of why I think Scala has an edge.
>>>
>>> I have not done one to one comparison of Spark with Scala vs Spark with
>>> Python. I understand for data science purposes most libraries like
>>> TensorFlow etc. are written in Python but I am at loss to understand the
>>> validity of using Python with Spark for ETL purposes.
>>>
>>> These are my understanding but they are not facts so I would like to get
>>> some informed views on this if I can?
>>>
>>> Many thanks,
>>>
>>> Mich
>>>
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: Scala vs Python for ETL with Spark

2020-10-09 Thread Russell Spitzer
As long as you don't use python lambdas in your Spark job there should be
almost no difference between the Scala and Python dataframe code. Once you
introduce python lambdas you will hit some significant serialization
penalties as well as have to run actual work code in python. As long as no
lambdas are used, everything will operate with Catalyst compiled java code
so there won't be a big difference between python and scala.

On Fri, Oct 9, 2020 at 3:57 PM Mich Talebzadeh 
wrote:

> I have come across occasions when the teams use Python with Spark for ETL,
> for example processing data from S3 buckets into Snowflake with Spark.
>
> The only reason I think they are choosing Python as opposed to Scala is
> because they are more familiar with Python. Since Spark is written in
> Scala, itself is an indication of why I think Scala has an edge.
>
> I have not done one to one comparison of Spark with Scala vs Spark with
> Python. I understand for data science purposes most libraries like
> TensorFlow etc. are written in Python but I am at loss to understand the
> validity of using Python with Spark for ETL purposes.
>
> These are my understanding but they are not facts so I would like to get
> some informed views on this if I can?
>
> Many thanks,
>
> Mich
>
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Exception handling in Spark throws recursive value for DF needs type error

2020-10-01 Thread Russell Spitzer
You can't use df as the name of the return from the try and the name of the
match variable in success. You also probably want to match the name of the
variable in the match with the return from the match.

So

val df = Try(spark.read.

 format("jdbc").

 option("url", jdbcUrl).

 option("dbtable", HiveSchema+"."+HiveTable).

 option("user", HybridServerUserName).

 option("password", HybridServerPassword).

 load()) match {

   *case Success(validDf) => validDf*

   case Failure(e) => throw new Exception("Error
Encountered reading Hive table")

 }

On Thu, Oct 1, 2020 at 5:53 PM Mich Talebzadeh 
wrote:

>
> Many thanks SEan.
>
>
> Maybe I misunderstood your point?
>
>
> var DF = Try(spark.read.
>
>  format("jdbc").
>
>  option("url", jdbcUrl).
>
>  option("dbtable", HiveSchema+"."+HiveTable).
>
>  option("user", HybridServerUserName).
>
>  option("password", HybridServerPassword).
>
>  load()) match {
>
>*case Success(DF) => HiveDF*
>
>case Failure(e) => throw new Exception("Error
> Encountered reading Hive table")
>
>  }
>
> Still getting the error
>
>
> :74: error: recursive method DF needs type
>
>   case Success(DF) => HiveDF
>
> Do I need to define DF as DataFrame beforehand because at that moment
> Spark does not know what DF type is
>
> Thanks again
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 1 Oct 2020 at 23:08, Sean Owen  wrote:
>
>> You are reusing HiveDF for two vars and it ends up ambiguous. Just rename
>> one.
>>
>> On Thu, Oct 1, 2020, 5:02 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> Spark version 2.3.3 on Google Dataproc
>>>
>>>
>>> I am trying to use databricks to other databases
>>>
>>>
>>> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>>>
>>>
>>>  to read from Hive table on Prem using Spark in Cloud
>>>
>>>
>>> This works OK without a Try enclosure.
>>>
>>>
>>> import spark.implicits._
>>>
>>> import scala.util.{Try, Success, Failure}
>>>
>>> val HiveDF = Try(spark.read.
>>>
>>>  format("jdbc").
>>>
>>>  option("url", jdbcUrl).
>>>
>>>  option("dbtable", HiveSchema+"."+HiveTable).
>>>
>>>  option("user", HybridServerUserName).
>>>
>>>  option("password", HybridServerPassword).
>>>
>>>  load()) match {
>>>
>>>case Success(HiveDF) => HiveDF
>>>
>>>case Failure(e) => throw new Exception("Error
>>> Encountered reading Hive table")
>>>
>>>  }
>>>
>>> However, with Try I am getting the following error
>>>
>>>
>>> :66: error: recursive value HiveDF needs type
>>>
>>>   case Success(HiveDF) => HiveDF
>>>
>>> Wondering what is causing this. I have used it before (say reading from
>>> an XML file) and it worked the,
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: RDD which was checkpointed is not checkpointed

2020-08-19 Thread Russell Spitzer
It determines whether it can use the checkpoint at runtime, so you'll be
able to see it in the UI but not in the plan since you are looking at the
plan
before the job is actually running when it checks to see if it can use the
checkpoint in the lineage.

Here is a two stage job for example:

*scala> val x = sc.parallelize(Seq("foo","bar"))*
*x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at
parallelize at :24*

*scala> val y = x.repartition(3)*
*y: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at repartition
at :25*

*scala> y.checkpoint*

*scala> y.count*
*res12: Long = 2*

[image: image.png]

[image: image.png]

*scala> y.count*
*res13: Long = 2*

[image: image.png]

Notice that we were able to skip the first stage because when Stage 11
looked for it's dependencies it
found a checkpointed version of the partitioned data so it didn't need to
repartition again. This makes my
2 Stage job into a 2 Stage job with 1 stage skipped or a 1 stage job.



On Wed, Aug 19, 2020 at 9:07 AM Ivan Petrov  wrote:

> i did it and see lineage change
>
> BEFORE calling action. No success.
>
> Job$ - isCheckpointed? false, getCheckpointFile: None
> Job$ - recordsRDD.toDebugString:
> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>  |  MapPartitionsRDD[5] at map at scala:40 []
>  |  ShuffledRDD[4] at reduceByKey at scala:31 []
>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
> |  MapPartitionsRDD[2] at map at ...:66 []
>
> AFTER calling action. nice, it works!
> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
> Job$ - recordsRDD.toDebugString:
> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>
> Lineage now contains only one stage but I want to get rid of it too. This
> stage happens right before the checkpoint. Will Spark try to re-run it in
> case task failure AFTER checkpoint?
> My expectation is that spark will read directly from checkpoint dir, It
> doesn't have to do anything with previous MapPartitionsRDD[7] at map at
>  Job.scala:112
>
> ср, 19 авг. 2020 г. в 16:01, Russell Spitzer :
>
>> Checkpoint is lazy and needs an action to actually do the work. The
>> method just marks the rdd as noted in the doc you posted.
>>
>> Call an action twice. The second run should use the checkpoint.
>>
>>
>>
>> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov  wrote:
>>
>>> i think it returns Unit... it won't work
>>> [image: image.png]
>>>
>>> I found another way to make it work. Called action after checkpoint
>>> val recordsRDD = convertToRecords(anotherRDD)
>>> recordsRDD.checkpoint()
>>> logger.info("checkpoint done")
>>> recordsRDD.count() // (!!!)
>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>> logger.info(s"recordsRDD.toDebugString:
>>> \n${recordsRDD.toDebugString}")
>>>
>>> Output:
>>> Job$ - checkpoint done (!!!)
>>>
>>> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>> Job$ - recordsRDD.toDebugString:
>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>
>>> But still it has single MapPartitionsRDD in lineage. Lineage became
>>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>>> want it to take data directly from checkpoint dir.
>>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>>> it twice in case of downstream task failure
>>>
>>>
>>>
>>>
>>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn :
>>>
>>>> Hi Ivan,
>>>>
>>>> Unlike cache/persist, checkpoint does not operate in-place but requires
>>>> the result to be assigned to a new variable. In your case:
>>>>
>>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>>
>>>> Best,
>>>> Jacob
>>>>
>>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov :
>>>>
>>>>> Hi!
>>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>>> checkpointed...
>>>>> What do I do wrong?
>>>>>
>>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>>> recordsRDD.checkpoint()
>>>>> logger.info("checkpoint done")
>>>>>
>>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>>> logger.info(s"recordsRDD.toDebugString:
>>>>> \n${recordsRDD.toDebugString}")
>>>>>
>>>>> Output:
>>>>> Job$ - checkpoint done (!!!)
>>>>>
>>>>> But then.
>>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>>> Job$ - recordsRDD.toDebugString:
>>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>>  |  MapPartitionsRDD[5] at map at scala:40 []
>>>>>  |  ShuffledRDD[4] at reduceByKey at scala:31 []
>>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>> |  MapPartitionsRDD[2] at map at ...:66 []
>>>>>
>>>>


Re: Where do the executors get my app jar from?

2020-08-13 Thread Russell Spitzer
Looking back at the code

All --jar Args and such run through

https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/SparkContext.scala#L493-L500

Which calls

https://github.com/apache/spark/blob/7f275ee5978e00ac514e25f5ef1d4e3331f8031b/core/src/main/scala/org/apache/spark/SparkContext.scala#L1842

Which places local jars on the driver hosted file server and just leaves
Remote Jars as is with the path for executors to access them

On Thu, Aug 13, 2020 at 11:01 PM Russell Spitzer 
wrote:

> The driver hosts a file server which the executors download the jar from.
>
> On Thu, Aug 13, 2020, 5:33 PM James Yu  wrote:
>
>> Hi,
>>
>> When I spark submit a Spark app with my app jar located in S3, obviously
>> the Driver will download the jar from the s3 location.  What is not clear
>> to me is: where do the Executors get the jar from?  From the same s3
>> location, or somehow from the Driver, or they don't need the jar?
>>
>> Thanks in advance for explanation.
>>
>> James
>>
>


Re: Where do the executors get my app jar from?

2020-08-13 Thread Russell Spitzer
The driver hosts a file server which the executors download the jar from.

On Thu, Aug 13, 2020, 5:33 PM James Yu  wrote:

> Hi,
>
> When I spark submit a Spark app with my app jar located in S3, obviously
> the Driver will download the jar from the s3 location.  What is not clear
> to me is: where do the Executors get the jar from?  From the same s3
> location, or somehow from the Driver, or they don't need the jar?
>
> Thanks in advance for explanation.
>
> James
>


Re: Spark streaming receivers

2020-08-10 Thread Russell Spitzer
The direct approach, which is also available through dstreams, and
structured streaming use a different model. Instead of being a push based
streaming solution they instead are pull based. (In general)

On every batch the driver uses the configuration to create a number of
partitions, each is responsible for independently pulling a number of
records. The exact number of records and guarantees around the pull are
source and configuration dependent. Since the system is pull based, there
is no need for a receiver or block management system taking up resources.
Every task/partition contains all the information required to get the data
that it describes.

An example in Kafka, the driver might decide that batch 1 contains all the
records between offset 1 and 100. It checks and sees that there are 10
Kafka partitions. So it ends up making a spark job which contains 10 tasks
each task dedicated to a single Kafka partition. Each task will then
independently ask for 100 records from it's Kafka partition. There will be
no Spark resources used outside of those required for those 10 tasks.

On Sun, Aug 9, 2020, 10:44 PM Dark Crusader 
wrote:

> Hi Russell,
> This is super helpful. Thank you so much.
>
> Can you elaborate on the differences between structured streaming vs
> dstreams? How would the number of receivers required etc change?
>
> On Sat, 8 Aug, 2020, 10:28 pm Russell Spitzer, 
> wrote:
>
>> Note, none of this applies to Direct streaming approaches, only receiver
>> based Dstreams.
>>
>> You can think of a receiver as a long running task that never finishes.
>> Each receiver is submitted to an executor slot somewhere, it then runs
>> indefinitely and internally has a method which passes records over to a
>> block management system. There is a timing that you set which decides when
>> each block is "done" and records after that time has passed go into the
>> next block (See parameter
>> <https://spark.apache.org/docs/latest/configuration.html#spark-streaming>
>>  spark.streaming.blockInterval)  Once a block is done it can be
>> processed in the next Spark batch.. The gap between a block starting and a
>> block being finished is why you can lose data in Receiver streaming without
>> WriteAheadLoging. Usually your block interval is divisible into your batch
>> interval so you'll get X blocks per batch. Each block becomes one partition
>> of the job being done in a Streaming batch. Multiple receivers can be
>> unified into a single dstream, which just means the blocks produced by all
>> of those receivers are handled in the same Streaming batch.
>>
>> So if you have 5 different receivers, you need at minimum 6 executor
>> cores. 1 core for each receiver, and 1 core to actually do your processing
>> work. In a real world case you probably want significantly more  cores on
>> the processing side than just 1. Without repartitioning you will never have
>> more that
>>
>> A quick example
>>
>> I run 5 receivers with block interval of 100ms and spark batch interval
>> of 1 second. I use union to group them all together, I will most likely end
>> up with one Spark Job for each batch every second running with 50
>> partitions (1000ms / 100(ms / partition / receiver) * 5 receivers). If I
>> have a total of 10 cores in the system. 5 of them are running receivers,
>> The remaining 5 must process the 50 partitions of data generated by the
>> last second of work.
>>
>> And again, just to reiterate, if you are doing a direct streaming
>> approach or structured streaming, none of this applies.
>>
>> On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader <
>> relinquisheddra...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm having some trouble figuring out how receivers tie into spark
>>> driver-executor structure.
>>> Do all executors have a receiver that is blocked as soon as it
>>> receives some stream data?
>>> Or can multiple streams of data be taken as input into a single executor?
>>>
>>> I have stream data coming in at every second coming from 5 different
>>> sources. I want to aggregate data from each of them. Does this mean I need
>>> 5 executors or does it have to do with threads on the executor?
>>>
>>> I might be mixing in a few concepts here. Any help would be appreciated.
>>> Thank you.
>>>
>>


Re: Spark streaming receivers

2020-08-08 Thread Russell Spitzer
Note, none of this applies to Direct streaming approaches, only receiver
based Dstreams.

You can think of a receiver as a long running task that never finishes.
Each receiver is submitted to an executor slot somewhere, it then runs
indefinitely and internally has a method which passes records over to a
block management system. There is a timing that you set which decides when
each block is "done" and records after that time has passed go into the
next block (See parameter

spark.streaming.blockInterval)  Once a block is done it can be processed in
the next Spark batch.. The gap between a block starting and a block being
finished is why you can lose data in Receiver streaming without
WriteAheadLoging. Usually your block interval is divisible into your batch
interval so you'll get X blocks per batch. Each block becomes one partition
of the job being done in a Streaming batch. Multiple receivers can be
unified into a single dstream, which just means the blocks produced by all
of those receivers are handled in the same Streaming batch.

So if you have 5 different receivers, you need at minimum 6 executor cores.
1 core for each receiver, and 1 core to actually do your processing work.
In a real world case you probably want significantly more  cores on the
processing side than just 1. Without repartitioning you will never have
more that

A quick example

I run 5 receivers with block interval of 100ms and spark batch interval of
1 second. I use union to group them all together, I will most likely end up
with one Spark Job for each batch every second running with 50 partitions
(1000ms / 100(ms / partition / receiver) * 5 receivers). If I have a total
of 10 cores in the system. 5 of them are running receivers, The remaining 5
must process the 50 partitions of data generated by the last second of work.

And again, just to reiterate, if you are doing a direct streaming approach
or structured streaming, none of this applies.

On Sat, Aug 8, 2020 at 10:03 AM Dark Crusader 
wrote:

> Hi,
>
> I'm having some trouble figuring out how receivers tie into spark
> driver-executor structure.
> Do all executors have a receiver that is blocked as soon as it
> receives some stream data?
> Or can multiple streams of data be taken as input into a single executor?
>
> I have stream data coming in at every second coming from 5 different
> sources. I want to aggregate data from each of them. Does this mean I need
> 5 executors or does it have to do with threads on the executor?
>
> I might be mixing in a few concepts here. Any help would be appreciated.
> Thank you.
>


Re: DataSource API v2 & Spark-SQL

2020-08-03 Thread Russell Spitzer
That's a bad error message. Basically you can't make a spark native catalog
reference for a dsv2 source. You have to use that Datasources catalog or
use the programmatic API. Both dsv1 and dsv2 programattic apis work (plus
or minus some options)

On Mon, Aug 3, 2020, 7:28 AM Lavelle, Shawn  wrote:

> Hello Spark community,
>
>I have a custom datasource in v1 API that I’m trying to port to v2 API,
> in Java.  Currently I have a DataSource registered via
> catalog.createTable(name, , schema, options map).  When trying to
> do this in data source API v2, I get an error saying my class (package)
> isn’t a valid data source Can you help me out?
>
>
>
> Spark versions are 3.0.0 w/scala 2.12, artifacts are Spark-core,
> spark-sql, spark-hive, spark-hive-thriftserver, spark-catalyst
>
>
>
> Here’s what the dataSource definition:  *public class LogTableSource
> implements  TableProvider,  SupportsRead,  DataSourceRegister, Serializable*
>
>
>
> I’m guessing that I am missing one of the required interfaces. Note, I did
> try this with using the LogTableSource below as “DefaultSource” but the
> behavior is the same.  Also, I keep reading about a DataSourceV2 Marker
> Interface, but it seems deprecated?
>
>
>
> Also, I tried to add *DataSourceV2ScanRelation* but that won’t compile:
>
> Output() in DataSourceV2ScanRelation cannot override Output() in QueryPlan
> return type Seq is not compatible with Seq
>
>
>
>   I’m fairly stumped – everything I’ve read online says there’s a marker
> interface of some kind and yet I can’t find it in my package list.
>
>
>
>   Looking forward to hearing from you,
>
>
>
> ~ Shawn
>
>
>
>
>
>
>
>
> [image: OSI]
> Shawn Lavelle
>
> Software Development
>
> 4101 Arrowhead Drive
> Medina, Minnesota 55340-9457
> Phone: 763 551 0559
> *Email:* shawn.lave...@osii.com
> *Website:* www.osii.com
>


Re: how to copy from one cassandra cluster to another

2020-07-28 Thread Russell Spitzer
You do not need one spark session per cluster.

Spark SQL with Datasource v1
http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/

DatasourceV2

Would require making two catalog references then copying between them


https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#example-of-reading-from-one-cluster-and-writing-to-another



On Tue, Jul 28, 2020, 6:26 AM Amit Sharma  wrote:

> Hi, I have table A in the cassandra cluster  cluster -1  in one data
> center. I have table B in cluster -2 in another data center. I want to copy
> the data from one cluster to another using spark. I faced the problem that
> I can not create two spark sessions as we need spark sessions per cluster.
> Please let me know if there is any way to use spark batch job to copy data
> among two cassandra clusters.
>
>
> Thanks
> Amit
>


Re: spark exception

2020-07-24 Thread Russell Spitzer
Usually this is just the sign that one of the executors quit unexpectedly
which explains the dead executors you see in the ui. The next step is
usually to go and look at those executor logs and see if there's any reason
for the termination. if you end up seeing an abrupt truncation of the log
that usually means the out of memory killer shut down the process.

At that point it means that although you set the RAM to a very high-level
the operating system was unable to service a malloc call when it was
important. This means that you probably need to run with a smaller heap
size because there wasn't enough working ram to handle the heap requested.

If the log ends with some other kind of exception then you need to look
into why that occured.

On Fri, Jul 24, 2020, 7:42 AM Amit Sharma  wrote:

> Hi All, sometimes i get this error in spark logs. I notice few executors
> are shown as dead in the executor tab during this error. Although my job
> get success. Please help me out the root cause of this issue. I have 3
> workers with 30 cores each and 64 GB RAM each. My job uses 3 cores per
> executor and uses a total of 63 cores and 4GB RAM per executor.
>
> Remote RPC client disassociated. Likely due to containers exceeding
> thresholds, or network issues. Check driver logs for WARN messages
>


Re: Garbage collection issue

2020-07-20 Thread Russell Spitzer
High GC is relatively hard to debug in general but I can give you a few
pointers. This basically means that the time spent cleaning up unused
objects is high which usually means memory is be used and thrown away
rapidly. It can also mean that GC is ineffective, and is being run many
times in an attempt to find things to free up. Since each run is not very
effective (because many things are still in use and cannot be thrown out)
it has to run more often.

So usually the easiest thing to do if possible is to increase the heap size
and hope that you are just seeing GC pressure because you need more free
memory than the JVM had. So I would recommend that as a first step,
increase the Executor Heap.

The longer and harder thing to do is to see exactly where object allocation
is taking place and attempt to minimize it. This requires walking through
your code, looking for long lived allocations and minimizing them if
possible.

On Mon, Jul 20, 2020 at 1:22 PM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma  wrote:
>
>> Hi All, i am running the same batch job in my two separate spark
>> clusters. In one of the clusters it is showing GC warning  on spark -ui
>> under executer tag. Garbage collection is taking longer time around 20 %
>> while in another cluster it is under 10 %. I am using the same
>> configuration in my spark submit and using G1GC .
>>
>> Please let me know what could be the reason for GC slowness.
>>
>>
>> Thanks
>> Amit
>>
>


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread Russell Spitzer
The code you linked to is very old and I don't think that method works
anymore (Hive context not existing anymore). My latest attempt at trying
this was Spark 2.2 and I ran into the issues I wrote about before.

In DSV2 it's done via a catalog implementation, so you basically can write
a new catalog that can create tables and such with whatever metadata you
like. I'm not sure there is a Hive Metastore catalog implemented yet in
DSV2. I also think if it was it would only be in Spark 3.0

On Mon, Jul 20, 2020 at 10:05 AM fansparker  wrote:

> Thanks Russell.  This
> <
> https://gite.lirmm.fr/yagoubi/spark/commit/6463e0b9e8067cce70602c5c9006a2546856a9d6#fecff1a3ad108a52192ba9cd6dd7b11a3d18871b_0_141>
>
> shows that the "refreshTable" and "invalidateTable" could be used to reload
> the metadata but they do not work in our case. I have tried to invoke the
> "schema()" with the updated schema from the "buildScan()" as well.
>
> It will be helpful to have this feature enabled for DataSourceV1 as the
> schema evolves, i will check if this is an change that can be made.
>
> You mentioned that it works in DataSourceV2. Is there an implementation
> sample for persistent tables DataSourceV2 that works with spark 2.4.4?
> Thanks again.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without seeing the rest (and you can confirm this by looking at the DAG
visualization in the Spark UI) I would say your first stage with 6
partitions is:

Stage 1: Read data from kinesis (or read blocks from receiver not sure what
method you are using) and write shuffle files for repartition
Stage 2 : Read shuffle files and do everything else

In general I think the biggest issue here is probably not the distribution
of tasks which based on your UI reading were quite small, but instead the
parallelization of the write operation since it is done synchronously. I
would suggest instead of trying to increase your parallelism by
partitioning, you attempt to have "doJob" run asynchronously and allow for
more parallelism within an executor rather than using multiple executor
threads/jvms.

That said you probably would run faster if you just skipped the repartition
based on the speed of second stage.

On Mon, Jul 20, 2020 at 8:23 AM forece85  wrote:

> Thanks for reply. Please find sudo code below. Its Dstreams reading for
> every
> 10secs from kinesis stream and after transformations, pushing into hbase.
> Once got Dstream, we are using below code to repartition and do processing:
>
> dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
> dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords
> ->
> {
>Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
>List listOfRecords = new ArrayList<>();
>while (partitionOfRecords.hasNext()) {
>  listOfRecords.add(partitionOfRecords.next());
>
>  if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
> continue;
>
>  List finalListOfRecords = listOfRecords;
>  doJob(finalListOfRecords, hbaseConnection);
>  listOfRecords = new ArrayList<>();
>}
> }));
>
>
> We are batching every 10 records and pass to doJob method where we batch
> process and bulk insert to hbase.
>
> With above code, will it be able to tell what is happening at job 1 -> 6
> tasks? and how to replace repartition method efficiently.
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without your code this is hard to determine but a few notes.

The number of partitions is usually dictated by the input source, see if it
has any configuration which allows you to increase input splits.

I'm not sure why you think some of the code is running on the driver. All
methods on dataframes and rdds will be executed on executors. For each
partition is not local.

The difference in partitions is probably the shuffle you added with
repartition. I would actually be not surprised if your code ran faster
without the repartitioning. But again with the real code it would be very
hard to say.

On Mon, Jul 20, 2020, 6:33 AM forece85  wrote:

> I am new to spark streaming and trying to understand spark ui and to do
> optimizations.
>
> 1. Processing at executors took less time than at driver. How to optimize
> to
> make driver tasks fast ?
> 2. We are using dstream.repartition(defaultParallelism*3) to increase
> parallelism which is causing high shuffles. Is there any option to avoid
> repartition manually to reduce data shuffles.
> 3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
> got created?
>
> *hardware configuration:* executor-cores: 3; driver-cores: 3;
> dynamicAllocation is true;
> initial,min,maxExecutors: 25
>
> StackOverFlow link for screenshots:
>
> https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread Russell Spitzer
The last I looked into this the answer is no. I believe since there is a
Spark Session internal relation cache, the only way to update a sessions
information was a full drop and create. That was my experience with a
custom hive metastore and entries read from it. I could change the entries
in the metastore underneath the session but since the session cached the
relation lookup I couldn't get it to reload the metadata.

DatssourceV2 does make this easy though

On Mon, Jul 20, 2020, 5:17 AM fansparker  wrote:

> Does anybody know if there is a way to get the persisted table's schema
> updated when the underlying custom data source schema is changed?
> Currently,
> we have to drop and re-create the table.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Using spark.jars conf to override jars present in spark default classpath

2020-07-16 Thread Russell Spitzer
That's what I'm saying you don't want to do :) If you have two versions of
a library with different apis the safest approach is shading and ordering
probably can't be relied on. In my experience reflection will behave in
ways you may not like as well as which classpath has priority when a class
is loading.  Spark.Jars will never be able to reorder so you'll need to get
those jars on the system class loader using the driver (and executor) extra
classpath args (with userClasspathFirst). I will stress again that it would
be my last choice for getting it working and I would try shading first if I
really have a conflict.

On Thu, Jul 16, 2020 at 2:17 PM Nupur Shukla 
wrote:

> Thank you Russel and Jeff,
>
> My bad, I wasn't clear before about the conflicting jars. By that, I meant
> my application needs to use an updated version of certain jars than what
> are present in the default classpath. What would be the best way to use
> confs spark.jar and spark.driver.extraClassPath both to do a classpath
> reordering so that the updated versions get picked first? Looks like the
> one way to use extraClassPath conf here.
>
>
>
>
> On Thu, 16 Jul 2020 at 12:05, Jeff Evans 
> wrote:
>
>> If you can't avoid it, you need to make use of the
>> spark.driver.userClassPathFirst and/or spark.executor.userClassPathFirst
>> properties.
>>
>> On Thu, Jul 16, 2020 at 2:03 PM Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>> I believe the main issue here is that spark.jars is a bit "too late" to
>>> actually prepend things to the class path. For most use cases this value is
>>> not read until after the JVM has already started and the system classloader
>>> has already loaded.
>>>
>>> The jar argument gets added via the dynamic class loader so it
>>> necessarily has to come after wards :/ Driver extra classpath and it's
>>> friends, modify the actual launch command of the driver (or executors) so
>>> they can prepend whenever they want.
>>>
>>>  In general you do not want to have conflicting jars at all if possible
>>> and I would recommend looking into shading if it's really important for
>>> your application to use a specific incompatible version of a library. Jar
>>> (and extraClasspath) are really just
>>> for adding additional jars and I personally would try not to rely on
>>> classpath ordering to get the right libraries recognized.
>>>
>>> On Thu, Jul 16, 2020 at 1:55 PM Nupur Shukla 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> How can we use *spark.jars* to to specify conflicting jars (that is,
>>>> jars that are already present in the spark's default classpath)? Jars
>>>> specified in this conf gets "appended" to the classpath, and thus gets
>>>> looked at after the default classpath. Is it not intended to be used to
>>>> specify conflicting jars?
>>>> Meanwhile when *spark.driver.extraClassPath* conf is specified, this
>>>> path is "prepended" to the classpath and thus takes precedence over the
>>>> default classpath.
>>>>
>>>> How can I use both to specify different jars and paths but achieve a
>>>> precedence of spark.jars path > spark.driver.extraClassPath > spark default
>>>> classpath (left to right precedence order)?
>>>>
>>>> Experiment conducted:
>>>>
>>>> I am using sample-project.jar which has one class in it SampleProject.
>>>> This has a method which prints the version number of the jar. For this
>>>> experiment I am using 3 versions of this sample-project.jar
>>>> Sample-project-1.0.0.jar is present in the spark default classpath in
>>>> my test cluster
>>>> Sample-project-2.0.0.jar is present in folder
>>>> /home//ClassPathConf on driver
>>>> Sample-project-3.0.0.jar is present in  folder /home//JarsConf on
>>>> driver
>>>>
>>>> (Empty cell in img below means that conf was not specified)
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> Thank you,
>>>> Nupur
>>>>
>>>>
>>>>


Re: Using spark.jars conf to override jars present in spark default classpath

2020-07-16 Thread Russell Spitzer
I believe the main issue here is that spark.jars is a bit "too late" to
actually prepend things to the class path. For most use cases this value is
not read until after the JVM has already started and the system classloader
has already loaded.

The jar argument gets added via the dynamic class loader so it necessarily
has to come after wards :/ Driver extra classpath and it's friends, modify
the actual launch command of the driver (or executors) so they can prepend
whenever they want.

 In general you do not want to have conflicting jars at all if possible and
I would recommend looking into shading if it's really important for your
application to use a specific incompatible version of a library. Jar (and
extraClasspath) are really just
for adding additional jars and I personally would try not to rely on
classpath ordering to get the right libraries recognized.

On Thu, Jul 16, 2020 at 1:55 PM Nupur Shukla 
wrote:

> Hello,
>
> How can we use *spark.jars* to to specify conflicting jars (that is, jars
> that are already present in the spark's default classpath)? Jars specified
> in this conf gets "appended" to the classpath, and thus gets looked at
> after the default classpath. Is it not intended to be used to specify
> conflicting jars?
> Meanwhile when *spark.driver.extraClassPath* conf is specified, this path
> is "prepended" to the classpath and thus takes precedence over the default
> classpath.
>
> How can I use both to specify different jars and paths but achieve a
> precedence of spark.jars path > spark.driver.extraClassPath > spark default
> classpath (left to right precedence order)?
>
> Experiment conducted:
>
> I am using sample-project.jar which has one class in it SampleProject.
> This has a method which prints the version number of the jar. For this
> experiment I am using 3 versions of this sample-project.jar
> Sample-project-1.0.0.jar is present in the spark default classpath in my
> test cluster
> Sample-project-2.0.0.jar is present in folder /home//ClassPathConf
> on driver
> Sample-project-3.0.0.jar is present in  folder /home//JarsConf on
> driver
>
> (Empty cell in img below means that conf was not specified)
>
> [image: image.png]
>
>
> Thank you,
> Nupur
>
>
>


Re: Truncate table

2020-07-01 Thread Russell Spitzer
I'm not sure what you're really trying to do here but it sounds like saving
the data to a park a file or other temporary store before truncating would
protect you in case of failure.

On Wed, Jul 1, 2020, 9:48 AM Amit Sharma  wrote:

> Hi, i have scenario where i have to read certain raw from a table and
> truncate the table and store the certain raws back to the table. I am doing
> below steps
>
> 1. reading certain raws in DF1 from cassandra table A.
> 2. saving into cassandra as override in table A
>
>
> the problem is when I truncate the table at step 2 I will lose the data
> in DF1 as it shows empty.
> I have two solutions
> 1. Store the DF1 in another temp table before truncating table A
> 2. Cache DF1 before truncating.
>
> Do we have any better solution ?
>
>
> Thanks
> Amit
>


Re: [Structured spak streaming] How does cassandra connector readstream deals with deleted record

2020-06-26 Thread Russell Spitzer
The connector uses Java driver cql request under the hood which means it
responds to the changing database like a normal application would. This
means retries may result in a different set of data than the original
request if the underlying database changed.

On Fri, Jun 26, 2020, 9:42 PM Jungtaek Lim 
wrote:

> I'm not sure how it is implemented, but in general I wouldn't expect such
> behavior on the connectors which read from non-streaming fashion storages.
> The query result may depend on "when" the records are fetched.
>
> If you need to reflect the changes in your query you'll probably want to
> find a way to retrieve "change logs" from your external storage (or how
> your system/product can also produce change logs if your external storage
> doesn't support it), and adopt it to your query. There's a keyword you can
> google to read further, "Change Data Capture".
>
> Otherwise, you can apply the traditional approach, run a batch query
> periodically and replace entire outputs.
>
> On Thu, Jun 25, 2020 at 1:26 PM Rahul Kumar 
> wrote:
>
>> Hello everyone,
>>
>> I was wondering, how Cassandra spark connector deals with deleted/updated
>> record while readstream operation. If the record was already fetched in
>> spark memory, and it got updated or deleted in database, does it get
>> reflected in streaming join?
>>
>> Thanks,
>> Rahul
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: java.lang.OutOfMemoryError Spark Worker

2020-05-08 Thread Russell Spitzer
The error is in the Spark Standalone Worker. It's hitting an OOM while
launching/running an executor process. Specifically it's running out of
memory when parsing the hadoop configuration trying to figure out the
env/command line to run

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala#L142-L149

Now usually this is something that I wouldn't expect to happen, since a
Spark Worker is generally a very lightweight process. Unless it was
accumulating a lot of state it should be relatively small and it should be
very unlikely that generating a command
line string would cause this error unless the application configuration was
gigantic. So while it's possible you just have very large hadoop.xml files
it is probably not this specific action that is ooming, but rather this is
the straw that broke
the camel's back and the worker just has too much other state.

This may not be pathologic, and it may just be you are running a lot of
executors, or it's keeping track of lots of started and shutdown executor
metadata or something like that and it's not a big deal.
You could fix this by limiting the amount of metadata preserved after jobs
are run see (spark.deploy.* options for retaining apps and spark worker
cleanup)
or by increasing the  Spark Worker's heap (SPARK_DAEMON_MEMORY)

If I hit this I would start by bumping Daemon Memory

On Fri, May 8, 2020 at 11:59 AM Hrishikesh Mishra 
wrote:

> We submit spark job through spark-submit command, Like below one.
>
>
> sudo /var/lib/pf-spark/bin/spark-submit \
> --total-executor-cores 30 \
> --driver-cores 2 \
> --class com.hrishikesh.mishra.Main\
> --master spark://XX.XX.XXX.19:6066  \
> --deploy-mode cluster  \
> --supervise
> http://XX.XX.XXX.19:90/jar/fk-runner-framework-1.0-SNAPSHOT.jar
>
>
>
>
> We have python http server, where we hosted all jars.
>
> The user kill the driver driver-20200508153502-1291 and its visible in log
> also, but this is not problem. OOM is separate from this.
>
> 20/05/08 15:36:55 INFO Worker: Asked to kill driver
> driver-20200508153502-1291
>
> 20/05/08 15:36:55 INFO DriverRunner: Killing driver process!
>
> 20/05/08 15:36:55 INFO CommandUtils: Redirection to
> /grid/1/spark/work/driver-20200508153502-1291/stderr closed: Stream closed
>
> 20/05/08 15:36:55 INFO CommandUtils: Redirection to
> /grid/1/spark/work/driver-20200508153502-1291/stdout closed: Stream closed
>
> 20/05/08 15:36:55 INFO ExternalShuffleBlockResolver: Application
> app-20200508153654-11776 removed, cleanupLocalDirs = true
>
> 20/05/08 *15:36:55* INFO Worker: Driver* driver-20200508153502-1291 was
> killed by user*
>
> *20/05/08 15:43:06 WARN AbstractChannelHandlerContext: An exception
> 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full
> stacktrace] was thrown by a user handler's exceptionCaught() method while
> handling the following exception:*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> *20/05/08 15:43:23 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in thread Thread[dispatcher-event-loop-6,5,main]*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> *20/05/08 15:43:17 WARN AbstractChannelHandlerContext: An exception
> 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full
> stacktrace] was thrown by a user handler's exceptionCaught() method while
> handling the following exception:*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> 20/05/08 15:43:33 INFO ExecutorRunner: Killing process!
>
> 20/05/08 15:43:33 INFO ExecutorRunner: Killing process!
>
> 20/05/08 15:43:33 INFO ExecutorRunner: Killing process!
>
> 20/05/08 15:43:33 INFO ShutdownHookManager: Shutdown hook called
>
> 20/05/08 15:43:33 INFO ShutdownHookManager: Deleting directory
> /grid/1/spark/local/spark-e045e069-e126-4cff-9512-d36ad30ee922
>
>
> On Fri, May 8, 2020 at 9:27 PM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> It's been a while since I worked with Spark Standalone, but I'd check the
>> logs of the workers. How do you spark-submit the app?
>>
>> DId you check /grid/1/spark/work/driver-20200508153502-1291 directory?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Fri, May 8, 2020 at 2:32 PM Hrishikesh Mishra 
>> wrote:
>>
>>> Thanks Jacek for quick response.
>>> Due to our system constraints, we can't move to Structured Streaming
>>> now. But definitely YARN can be tried out.
>>>
>>> But my problem is I'm able to figure out where is the issue, Driver,
>>> Executor, or Worker. Even exceptions are clueless.  Please see the below
>>> exception, I'm unable to spot the issue for OOM.
>>>
>>> 20/05/08 15:36:55 INFO Worker: Asked to kill driver
>>> driver-20200508153502-1291
>>>
>>> 20/05/08 15:36:55 INFO DriverRunner: Killing driver process!
>>>
>>> 20/05/08 15:36:55 INFO 

Re: Copyright Infringment

2020-04-25 Thread Russell Spitzer
I do not see any conflict. I'm not sure what the exact worry of
infringement is based on? The Apache license does not restrict anyone from
writing a book about a project.

On Sat, Apr 25, 2020, 10:48 AM Som Lima  wrote:

> The text is very clear on the issue of copyright infringement. Ask
> permission or you are committing an unlawful act.
>
> The words "significant portion" has not been quantified.
>
> So I have nothing to ask of the authors except may be to quantify.
> Quantification is a secondary issue.
>
> My reading of the text is that it applies to any spark user and not just
> me personally.
>
> The authors need to make clear to all spark users whether copyright
> infringement was their intent or not.
>
> The authors need to make clear to all spark users why should any
> development team share their Use Case in order  avoid  falling on the
> wrong side
> of copyright infringement claims.
>
> I understand  you are also  a named author of a book on Apache usage.
>
> Perhaps you can share with us from your expertise  the need or your
> motivation  for the addendum to the Apache Spark online usage documents.
>
> Let me rephrase my question.
>
> Does any Spark User feel as I do this text is a violation of Apache
> foundation's  free licence agreement  ?
>
>
>
> On Sat, 25 Apr 2020, 16:18 Sean Owen,  wrote:
>
>> You'll want to ask the authors directly ; the book is not produced by the
>> project itself, so can't answer here.
>>
>> On Sat, Apr 25, 2020, 8:42 AM Som Lima  wrote:
>>
>>> At the risk of being removed from the emailing I would like a
>>> clarification because I do not want  to commit an unlawful act.
>>> Can you please clarify if I would be infringing copyright due to this
>>> text.
>>> *Book:  High Performance Spark *
>>> *authors: holden Karau Rachel Warren.*
>>> *page xii:*
>>>
>>> * This book is here to help you get your job done ... If for example
>>> code is offered with this book, you may use it in your programs and
>>> documentation. You do not need to contact us for permission unless your are
>>> reproducing significant portion of the code. *
>>>
>>


Re: Spark driver thread

2020-03-06 Thread Russell Spitzer
So one thing to know here is that all Java applications are going to use
many threads, and just because your particular main method doesn't spawn
additional threads doesn't mean library you access or use won't spawn
additional threads. The other important note is that Spark doesn't actually
equate "core - threads", when you request a core or something like that
spark doesn't do anything special to actually make sure only a single
physical core is in use.

That said, would allocating more vCpu to a driver make a difference?
Probably not. This is very dependent on your own code and whether a lot of
work is being done on the driver vs on the executors. For example, are you
loading up and processing some data which is used to spawn remote work? If
so having more cpus locally may help. So look into your app, is almost all
the work inside dataframes or RDDs? Then more resources for the driver
won't help.


TLDR; For most use cases 1 core is sufficient regardless of client/cluster
mode

On Fri, Mar 6, 2020 at 11:36 AM James Yu  wrote:

> Pol, thanks for your reply.
>
> Actually I am running Spark apps in CLUSTER mode. Is what you said still
> applicable in cluster mode.  Thanks in advance for your further
> clarification.
>
> --
> *From:* Pol Santamaria 
> *Sent:* Friday, March 6, 2020 12:59 AM
> *To:* James Yu 
> *Cc:* user@spark.apache.org 
> *Subject:* Re: Spark driver thread
>
> Hi james,
>
> You can configure the Spark Driver to use more than a single thread. It is
> something that depends on the application, but the Spark driver can take
> advantage of multiple threads in many situations. For instance, when the
> driver program gathers or sends data to the workers.
>
> So yes, if you do computation or I/O on the driver side, you should
> explore using multithreads and more than 1 vCPU.
>
> Bests,
> Pol Santamaria
>
> On Fri, Mar 6, 2020 at 1:28 AM James Yu  wrote:
>
> Hi,
>
> Does a Spark driver always works as single threaded?
>
> If yes, does it mean asking for more than one vCPU for the driver is
> wasteful?
>
>
> Thanks,
> James
>
>


Re: How to implement "getPreferredLocations" in Data source v2?

2020-01-18 Thread Russell Spitzer
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java

See InputPartition which had a preferred location parameter you should
override

On Sat, Jan 18, 2020, 1:44 PM kineret M  wrote:

> Hi,
> I would like to support data locality in Spark data source v2. How can I
> provide Spark the ability to read and process data on the same node?
>
> I didn't find any interface that supports 'getPreferredLocations' (or
> equivalent).
>
> Thanks!
>


Re: Issue With mod function in Spark SQL

2019-12-17 Thread Russell Spitzer
Is there a chance your data is all even or all odd?

On Tue, Dec 17, 2019 at 11:01 AM Tzahi File  wrote:

> I have in my spark sql query a calculated field that gets the value if
> field1 % 3.
>
> I'm using this field as a partition so I expected to get 3 partitions in
> the mentioned case, and I do get. The issue happened with even numbers
> (instead of 3 - 4,2 ... ).
> When I tried to use even numbers, for example 4 I got only 2 partitions -
> 1 and 3.
> Field 1 datatype is bigint.
>
> Do you have any suggestions?
>
>
> --
> thanks,
> Tzahi
>


Re: error , saving dataframe , LEGACY_PASS_PARTITION_BY_AS_OPTIONS

2019-11-13 Thread Russell Spitzer
My guess would be this is a Spark Version mismatch. The option is added

https://github.com/apache/spark/commit/df9a50637e2622a15e9af7d837986a0e868878b1

https://issues.apache.org/jira/browse/SPARK-27453

In 2.4.2

I would make sure your Spark installs are all 2.4.4 and that your code is
compiled against 2.4.4 spark libs. For example if one node had 2.4.1 or
2.4.0 you would end up with an error.

On Wed, Nov 13, 2019 at 9:05 AM Femi Anthony  wrote:

> Can you post the line of code that’s resulting in that error along with
> the stack trace ?
>
> Sent from my iPhone
>
> On Nov 13, 2019, at 9:53 AM, asma zgolli  wrote:
>
> 
>
> Hello ,
>
>
> I'm using spark 2.4.4 and i keep receiving this error message. Can you
> please help me identify the problem?
>
>
> thank you ,
>
> yours sincerely
> Asma ZGOLLI
>
> PhD student in data engineering - computer science
>
> PJ:
>
>
>
> "main" java.lang.NoSuchMethodError:
> org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;
>
> at org.apache.spark.sql.DataFrameWriter.saveToV1Source(
> DataFrameWriter.scala:277)
>
>
>
>
>
>


Re: Spark SQL

2019-06-10 Thread Russell Spitzer
Spark can use the HiveMetastore as a catalog, but it doesn't use the hive
parser or optimization engine. Instead it uses Catalyst, see
https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

On Mon, Jun 10, 2019 at 2:07 PM naresh Goud 
wrote:

> Hi Team,
>
> Is Spark Sql uses hive engine to run queries ?
> My understanding that spark sql uses hive meta store to get metadata
> information to run queries.
>
> Thank you,
> Naresh
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


Re: Are Spark Dataframes mutable in Structured Streaming?

2019-05-16 Thread Russell Spitzer
You are looking at the digram without looking at the underlying request.
The behavior of state collection is dependent on the request and the output
mode of the query.

In the example you cite

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()
// Split the lines into wordsval words = lines.as[String].flatMap(_.split(" "))
// Generate running word countval wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the
consoleval query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()


This code contains two important features which cause the whole state to be
held. First we have an aggregation. The groupBy operation is un bounded.
This means an unlimited amount of state will be required to be stored as
the application proceeds. In addition the output mode is "complete". This
means on every batch iteration the entire set of output results must be
returned.

Digram is just showing which records are processed by the request at each
time, along with the previous data that was required to build the state.

On Thu, May 16, 2019 at 4:38 AM Sheel Pancholi  wrote:

> Hello,
>
> Along with what I sent before, I want to add that I went over the
> documentation at
> https://github.com/apache/spark/blob/master/docs/structured-streaming-programming-guide.md
>
>
> Here is an excerpt:
>
> [image: Model]
>> <https://github.com/apache/spark/blob/master/docs/img/structured-streaming-example-model.png>
>>
>> Note that Structured Streaming does not materialize the entire table. It
>> reads the latest available data from the streaming data source, processes
>> it incrementally to update the result, and then discards the source data.
>> It only keeps around the minimal intermediate *state* data as required
>> to update the result (e.g. intermediate counts in the earlier example).
>>
> My question is: on one hand, the diagram shows the input table to truly be
> unbounded by constantly letting the data arrive into this "table". But, on
> the other hand, it also says, that it discards the source data. Then, what
> is the meaning of the unbounded table in the diagram above showing
> incremental data arriving and sitting in this unbounded input table!
> Moreover, it also says that it keeps the intermediate data only (i.e. the
> intermediate counts). This is kind of sounding contradictory in my head.
>
> Could you please clarify what is it ultimately supposed to be?
>
> Regards
> Sheel
>
> On Thu, May 16, 2019 at 2:44 PM Sheel Pancholi 
> wrote:
>
>> Hello Russell,
>>
>> Thanks for clarifying. I went over the Catalyst Optimizer Deep Dive video
>> at https://www.youtube.com/watch?v=RmUn5vHlevc and that along with your
>> explanation made me realize that the the DataFrame is the new DStream in
>> Structured Streaming. If my understanding is correct, request you to
>> clarify the 2 points below:
>>
>> 1. Incremental Query - Say at time instant T1 you have 10 items to
>> process, and then at time instant T2 you have 5 newer items streaming in to
>> be processed. *Structured Streaming* says that the DF is treated as an
>> unbounded table and hence 15 items will be processed together. Does this
>> mean that on Iteration 1 (i.e. time instant T1) the Catalyst Optimizer in
>> the code generation phase creates an RDD of 10 elements and on Iteration 2
>> ( i.e. time instant T2 ), the Catalyst Optimizer creates an RDD of 15
>> elements?
>> 2. You mentioned *"Some parts of the plan refer to static pieces of
>> data  ..."*  Could you elaborate a bit more on what does this static
>> piece of data refer to? Are you referring to the 10 records that had
>> already arrived at T1 and are now sitting as old static data in the
>> unbounded table?
>>
>> Regards
>> Sheel
>>
>>
>> On Thu, May 16, 2019 at 3:30 AM Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>> Dataframes describe the calculation to be done, but the underlying
>>> implementation is an "Incremental Query". That is that the dataframe code
>>> is executed repeatedly with Catalyst adjusting the final execution plan on
>>> each run. Some parts of the plan refer to static pieces of data, others
>>> refer to data which is pulled in on each iteration. None of this changes
>>> the DataFrame objects themselves.
>>>
>>>
>>>
>>>
>>> On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi 
>>> wrote:
>

Re: Are Spark Dataframes mutable in Structured Streaming?

2019-05-15 Thread Russell Spitzer
Dataframes describe the calculation to be done, but the underlying
implementation is an "Incremental Query". That is that the dataframe code
is executed repeatedly with Catalyst adjusting the final execution plan on
each run. Some parts of the plan refer to static pieces of data, others
refer to data which is pulled in on each iteration. None of this changes
the DataFrame objects themselves.




On Wed, May 15, 2019 at 1:34 PM Sheel Pancholi  wrote:

> Hi
> Structured Streaming treats a stream as an unbounded table in the form of
> a DataFrame. Continuously flowing data from the stream keeps getting added
> to this DataFrame (which is the unbounded table) which warrants a change to
> the DataFrame which violates the vary basic nature of a DataFrame since a
> DataFrame by its nature is immutable. This sounds contradictory. Is there
> an explanation for this?
>
> Regards
> Sheel
>


Re: spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?

2019-05-09 Thread Russell Spitzer
2.4.3 Binary is out now and they did change back to 2.11.
https://www.apache.org/dyn/closer.lua/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

On Mon, May 6, 2019 at 9:21 PM Russell Spitzer 
wrote:

> Spark 2.4.2 was incorrectly released with the default package binaries set
> to Scala 2.12
> <https://lists.apache.org/thread.html/af556c307b1ff9672f400964c8d13b081c4edb9821cf30c22aaac8a1@%3Cdev.spark.apache.org%3E>instead
> of scala 2.11.12 which was supposed to be the case. See the 2.4.3 vote
> <https://lists.apache.org/thread.html/609a820ea4dc56a31b9766142834b6954bb9c567ea85adca9ea099c8@%3Cdev.spark.apache.org%3E>that
> is happening at this very moment. While Spark can be built against 2.12 the
> correct default for binaries was supposed to be 2.11. So either build Spark
> 2.4.2 with 2.11 or wait for the 2.4.3 release which will be very soon.
>
> So the reason that your 2.4.0 build works is that the default binaries
> were built against 2.11, so even though you build specified 2.12 (at least
> as far as I can tell) your runtime was the prebuilt 2.4.0 version. So no
> linkage errors at runtime. 2.4.1 similarly had default binaries with 2.11
> and only 2.4.2 switched the minor version of scala. The 2.4.3 release will
> switch this back.
>
> On Mon, May 6, 2019, 9:06 PM Richard Xin  wrote:
>
>> Thanks for the reply.
>> Unfortunately this is the highest version available for Cassandra
>> connector.
>>
>> One thing I don’t quite understand is that it worked perfectly under
>> Spark 2.4.0. I thought support for Scala 2.11 only became deprecated
>> starting spark 2.4.1, will be removed after spark 3.0
>>
>>
>> Sent from Yahoo Mail for iPhone
>> <https://overview.mail.yahoo.com/?.src=iOS>
>>
>> On Monday, May 6, 2019, 18:34, Russell Spitzer 
>> wrote:
>>
>> Scala version mismatched
>>
>> Spark is shown at 2.12, the connector only has a 2.11 release
>>
>>
>>
>> On Mon, May 6, 2019, 7:59 PM Richard Xin 
>> wrote:
>>
>> 
>> org.apache.spark
>> spark-core_2.12
>> 2.4.0
>> compile
>> 
>> 
>> org.apache.spark
>> spark-sql_2.12
>> 2.4.0
>> 
>> 
>> com.datastax.spark
>> spark-cassandra-connector_2.11
>> 2.4.1
>> 
>>
>>
>>
>> I run spark-submit I got following exceptions on Spark 2.4.2, it works
>> fine when running  spark-submit under Spark 2.4.0 with exact the same
>> command-line call, any idea how do i fix this? Thanks a lot!
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> scala/Product$class
>> at
>> com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7)
>> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33)
>> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala)
>> at
>> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134)
>> at
>> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala)
>> at
>> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>> at
>> com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>


Re: spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?

2019-05-06 Thread Russell Spitzer
Actually i just checked the release, they only changed the pyspark part. So
the download on the website will still be 2.12 so you'll need to build the
scala 2.11 version of Spark if you want to use the connector. Or Submit a
PR for scala 2.12 support

On Mon, May 6, 2019 at 9:21 PM Russell Spitzer 
wrote:

> Spark 2.4.2 was incorrectly released with the default package binaries set
> to Scala 2.12
> <https://lists.apache.org/thread.html/af556c307b1ff9672f400964c8d13b081c4edb9821cf30c22aaac8a1@%3Cdev.spark.apache.org%3E>instead
> of scala 2.11.12 which was supposed to be the case. See the 2.4.3 vote
> <https://lists.apache.org/thread.html/609a820ea4dc56a31b9766142834b6954bb9c567ea85adca9ea099c8@%3Cdev.spark.apache.org%3E>that
> is happening at this very moment. While Spark can be built against 2.12 the
> correct default for binaries was supposed to be 2.11. So either build Spark
> 2.4.2 with 2.11 or wait for the 2.4.3 release which will be very soon.
>
> So the reason that your 2.4.0 build works is that the default binaries
> were built against 2.11, so even though you build specified 2.12 (at least
> as far as I can tell) your runtime was the prebuilt 2.4.0 version. So no
> linkage errors at runtime. 2.4.1 similarly had default binaries with 2.11
> and only 2.4.2 switched the minor version of scala. The 2.4.3 release will
> switch this back.
>
> On Mon, May 6, 2019, 9:06 PM Richard Xin  wrote:
>
>> Thanks for the reply.
>> Unfortunately this is the highest version available for Cassandra
>> connector.
>>
>> One thing I don’t quite understand is that it worked perfectly under
>> Spark 2.4.0. I thought support for Scala 2.11 only became deprecated
>> starting spark 2.4.1, will be removed after spark 3.0
>>
>>
>> Sent from Yahoo Mail for iPhone
>> <https://overview.mail.yahoo.com/?.src=iOS>
>>
>> On Monday, May 6, 2019, 18:34, Russell Spitzer 
>> wrote:
>>
>> Scala version mismatched
>>
>> Spark is shown at 2.12, the connector only has a 2.11 release
>>
>>
>>
>> On Mon, May 6, 2019, 7:59 PM Richard Xin 
>> wrote:
>>
>> 
>> org.apache.spark
>> spark-core_2.12
>> 2.4.0
>> compile
>> 
>> 
>> org.apache.spark
>> spark-sql_2.12
>> 2.4.0
>> 
>> 
>> com.datastax.spark
>> spark-cassandra-connector_2.11
>> 2.4.1
>> 
>>
>>
>>
>> I run spark-submit I got following exceptions on Spark 2.4.2, it works
>> fine when running  spark-submit under Spark 2.4.0 with exact the same
>> command-line call, any idea how do i fix this? Thanks a lot!
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> scala/Product$class
>> at
>> com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7)
>> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33)
>> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala)
>> at
>> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134)
>> at
>> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala)
>> at
>> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
>> at
>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>> at
>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>> at
>> com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>


Re: spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?

2019-05-06 Thread Russell Spitzer
Spark 2.4.2 was incorrectly released with the default package binaries set
to Scala 2.12
<https://lists.apache.org/thread.html/af556c307b1ff9672f400964c8d13b081c4edb9821cf30c22aaac8a1@%3Cdev.spark.apache.org%3E>instead
of scala 2.11.12 which was supposed to be the case. See the 2.4.3 vote
<https://lists.apache.org/thread.html/609a820ea4dc56a31b9766142834b6954bb9c567ea85adca9ea099c8@%3Cdev.spark.apache.org%3E>that
is happening at this very moment. While Spark can be built against 2.12 the
correct default for binaries was supposed to be 2.11. So either build Spark
2.4.2 with 2.11 or wait for the 2.4.3 release which will be very soon.

So the reason that your 2.4.0 build works is that the default binaries were
built against 2.11, so even though you build specified 2.12 (at least as
far as I can tell) your runtime was the prebuilt 2.4.0 version. So no
linkage errors at runtime. 2.4.1 similarly had default binaries with 2.11
and only 2.4.2 switched the minor version of scala. The 2.4.3 release will
switch this back.

On Mon, May 6, 2019, 9:06 PM Richard Xin  wrote:

> Thanks for the reply.
> Unfortunately this is the highest version available for Cassandra
> connector.
>
> One thing I don’t quite understand is that it worked perfectly under Spark
> 2.4.0. I thought support for Scala 2.11 only became deprecated starting
> spark 2.4.1, will be removed after spark 3.0
>
>
> Sent from Yahoo Mail for iPhone
> <https://overview.mail.yahoo.com/?.src=iOS>
>
> On Monday, May 6, 2019, 18:34, Russell Spitzer 
> wrote:
>
> Scala version mismatched
>
> Spark is shown at 2.12, the connector only has a 2.11 release
>
>
>
> On Mon, May 6, 2019, 7:59 PM Richard Xin 
> wrote:
>
> 
> org.apache.spark
> spark-core_2.12
> 2.4.0
> compile
> 
> 
> org.apache.spark
> spark-sql_2.12
> 2.4.0
> 
> 
> com.datastax.spark
> spark-cassandra-connector_2.11
> 2.4.1
> 
>
>
>
> I run spark-submit I got following exceptions on Spark 2.4.2, it works
> fine when running  spark-submit under Spark 2.4.0 with exact the same
> command-line call, any idea how do i fix this? Thanks a lot!
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/Product$class
> at
> com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7)
> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33)
> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala)
> at
> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134)
> at
> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala)
> at
> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
> at
> com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>


Re: spark-cassandra-connector_2.1 caused java.lang.NoClassDefFoundError under Spark 2.4.2?

2019-05-06 Thread Russell Spitzer
Scala version mismatched

Spark is shown at 2.12, the connector only has a 2.11 release



On Mon, May 6, 2019, 7:59 PM Richard Xin 
wrote:

> 
> org.apache.spark
> spark-core_2.12
> 2.4.0
> compile
> 
> 
> org.apache.spark
> spark-sql_2.12
> 2.4.0
> 
> 
> com.datastax.spark
> spark-cassandra-connector_2.11
> 2.4.1
> 
>
>
>
> I run spark-submit I got following exceptions on Spark 2.4.2, it works
> fine when running  spark-submit under Spark 2.4.0 with exact the same
> command-line call, any idea how do i fix this? Thanks a lot!
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/Product$class
> at
> com.datastax.spark.connector.util.ConfigParameter.(ConfigParameter.scala:7)
> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala:33)
> at com.datastax.spark.connector.rdd.ReadConf$.(ReadConf.scala)
> at
> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:134)
> at
> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala)
> at
> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
> at
> com.apple.jmet.pallas.data_migration.DirectMigrationWConfig.main(DirectMigrationWConfig.java:76)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>


Re: 3 equalTo "3.15" = true

2019-02-06 Thread Russell Spitzer
Run an "explain" instead of show, i'm betting it's casting tier_id to a
small_int to do the comparison

On Wed, Feb 6, 2019 at 9:31 AM Artur Sukhenko 
wrote:

> Hello guys,
> I am migrating from Spark 1.6 to 2.2 and have this issue:
> I am casting string to short and comparing them with equal .
> Original code is:
> ... when(col(fieldName).equalTo(castedValueCol), castedValueCol).
>
>   otherwise(defaultErrorValueCol)
>
> Reproduce (version 2.3.0.cloudera4):
> scala> val df = Seq("3.15").toDF("tier_id")
> df: org.apache.spark.sql.DataFrame = [tier_id: string]
>
> scala> val colShort = col("tier_id").cast(ShortType)
> colShort: org.apache.spark.sql.Column = CAST(tier_id AS SMALLINT)
>
> scala> val colString = col("tier_id")
> colString: org.apache.spark.sql.Column = tier_id
>
> scala> res4.select(colString, colShort, colShort.equalTo(colString)).show
> +---+---+-+
> |tier_id|tier_id|(CAST(tier_id AS SMALLINT) = tier_id)|
> +---+---+-+
> |   3.15|  3| true|
> +---+---+-+
> scala>
>
> Why is this?
> --
> --
> Artur Sukhenko
>


Re: Structured Streaming : Custom Source and Sink Development and PySpark.

2018-08-30 Thread Russell Spitzer
Yes, Scala or Java.

No. Once you have written the implementation it is valid in all df apis.

As for examples there are many, check the Kafka source in tree or one of
the many sources listed on the spark packages website.

On Thu, Aug 30, 2018, 8:23 PM Ramaswamy, Muthuraman <
muthuraman.ramasw...@viasat.com> wrote:

> I would like to develop Custom Source and Sink. So, I have a couple of
> questions:
>
>
>
>1. Do I have to use Scala or Java to develop these Custom Source/Sink?
>
>
>
>1. Also, once the source/sink has been developed, to use in
>PySpark/Python, do I have to develop any Py4J modules? Any pointers or good
>documentation or GitHub Source as a reference will be of great help.
>
>
>
> Please advise.
>
>
>
> Thank you,
>
>
>
>
>
>
>


Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-05-21 Thread Russell Spitzer
The answer is most likely that when you use Cross Java - Python code you
incur a penalty for every objects that you transform from a Java object
into a Python object (and then back again to a Python object) when data is
being passed in and out of your functions. A way around this would probably
be to have used the Dataframe API if possible, which would have compiled
the interactions in Java and skipped python-java serialization. Using Scala
from the start thought is a great idea. I would also probably remove the
cache from your stream since that probably is only hurting (adding an
additional serialization which is only used once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman 
wrote:

> The main language they developed spark with is scala, so all the new
> features go first to scala, java and finally python. I'm not surprised by
> the results, we've seen it on Stratio since the first versions of spark. At
> the beginning of development, some of our engineers make the prototype with
> python, but when it comes down to it, if it goes into production, it has to
> be rewritten in scala or java, usually scala.
>
>
>
> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
> saulo.sobre...@outlook.pt>) escribió:
>
>> Hi Javier,
>>
>> Thank you a lot for the feedback.
>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
>> this use case in yarn-client mode. I managed to run this in standalone
>> (local master) mode only.
>>
>> I do not have the hardware available to run this setup in a cluster yet,
>> so I decided to dig a little bit more in the implementation to see what
>> could I improve. I just finished evaluating some results.
>> If you find something wrong or odd please let me know.
>>
>> Following your suggestion to use "saveToCassandra" directly I decided to
>> try Scala. Everything was implemented in the most similar way possible and
>> I got surprised by the results. The scala implementation is much faster.
>>
>> My current implementation is slightly different from the Python code
>> shared some emails ago but to compare the languages influence in the most
>> comparable way I used the following snippets:
>>
>> # Scala implementation --
>>
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>  ssc,
>>  LocationStrategies.PreferConsistent,
>>  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> kstream
>>.map( x => parse(x.value) )
>>.saveToCassandra("hdpkns", "batch_measurement")
>>
>> # Python implementation 
>> # Adapted from the previously shared code. However instead of
>> calculating the metrics, it is just parsing the messages.
>>
>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
> {"metadata.broker.list": brokers})
>

> kafkaStream \
> .transform(parse) \
> .foreachRDD(casssave)
>
>
>> For the same streaming input the scala app took an average of ~1.5
>> seconds to handle each event. For the python implementation, the app took
>> an average of ~80 seconds to handle each event (and after a lot of pickle
>> concurrency access issues).
>>
>> Note that I considered the time as the difference between the event
>> generation (before being published to Kafka) and the moment just before the
>> saveToCassandra.
>>
>> The problem in the python implementation seems to be due to the delay
>> introduced by the foreachRDD(casssave) call, which only runs 
>> rdd.saveToCassandra(
>> "test_hdpkns", "measurement" ).
>>
>>
>> Honestly I was not expecting such a difference between these 2 codes...
>> Can you understand why is this happening ?
>>
>>
>>
>> Again, Thank you very much for your help,
>>
>> Best Regards
>>
>>
>> Sharing my current Scala code below
>> # Scala Snippet =
>> val sparkConf = new SparkConf(). // ...
>> val ssc = new StreamingContext(sparkConf, Seconds(1))
>> val sc = ssc.sparkContext
>> //...
>> val kstream = KafkaUtils.createDirectStream[String, String](
>>  ssc,
>>  LocationStrategies.PreferConsistent,
>>  ConsumerStrategies.Subscribe[String, String](topic,
>> kafkaParams))
>> //...
>> // handle Kafka messages in a parallel fashion
>> val ckstream = kstream.map( x => parse(x.value) ).cache()
>> ckstream
>>   .foreachRDD( rdd => {
>> rdd.foreach(metrics)
>>   } )
>> ckstream
>>   .saveToCassandra("hdpkns", "microbatch_raw_measurement")
>> #=
>>
>> On 30/04/2018 14:57:50, Javier Pareja  wrote:
>> Hi Saulo,
>>
>> If the CPU is close to 100% then you are hitting the limit. I don't think
>> that moving to Scala will make a difference. Both Spark and Cassandra are
>> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
>> another (physical) machine so that the 2 cores are 

Re: Does Spark SQL uses Calcite?

2017-08-12 Thread Russell Spitzer
You don't have to go through hive. It's just spark sql. The application is
just a forked hive thrift server.

On Fri, Aug 11, 2017 at 8:53 PM kant kodali  wrote:

> @Ryan it looks like if I enable thrift server I need to go through hive. I
> was talking more about having JDBC connector for Spark SQL itself other
> words not going through hive.
>
> On Fri, Aug 11, 2017 at 6:50 PM, kant kodali  wrote:
>
>> @Ryan Does it work with Spark SQL 2.1.1?
>>
>> On Fri, Aug 11, 2017 at 12:53 AM, Ryan  wrote:
>>
>>> the thrift server is a jdbc server, Kanth
>>>
>>> On Fri, Aug 11, 2017 at 2:51 PM,  wrote:
>>>
 I also wonder why there isn't a jdbc connector for spark sql?

 Sent from my iPhone

 On Aug 10, 2017, at 2:45 PM, Jules Damji  wrote:

 Yes, it's more used in Hive than Spark

 Sent from my iPhone
 Pardon the dumb thumb typos :)

 On Aug 10, 2017, at 2:24 PM, Sathish Kumaran Vairavelu <
 vsathishkuma...@gmail.com> wrote:

 I think it is for hive dependency.
 On Thu, Aug 10, 2017 at 4:14 PM kant kodali  wrote:

> Since I see a calcite dependency in Spark I wonder where Calcite is
> being used?
>
> On Thu, Aug 10, 2017 at 1:30 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Spark SQL doesn't use Calcite
>>
>> On Thu, Aug 10, 2017 at 3:14 PM kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> Does Spark SQL uses Calcite? If so, what for? I thought the Spark
>>> SQL has catalyst which would generate its own logical plans, physical 
>>> plans
>>> and other optimizations.
>>>
>>> Thanks,
>>> Kant
>>>
>>
>
>>>
>>
>


Re: Spark (SQL / Structured Streaming) Cassandra - PreparedStatement

2017-07-21 Thread Russell Spitzer
The scc includes the java driver. Which means you could just use java
driver functions. It also provides a serializable wrapper which has session
and prepared statement pooling. Something like

val cc = CassandraConnector(sc.getConf)
SomeFunctionWithAnIterator{
   it: SomeIterator =>
   cc.withSessionDo { session =>
  val ps = session.prepare("statement")
  it.map( row => session.executeAsync(ps.bind(row))
   }
} // Do something with the futures here

I wrote a blog post about this here
http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/#concurrency-with-the-cassandra-java-driver

On Tue, Apr 11, 2017 at 4:05 AM Bastien DINE 
wrote:

> Hi everyone,
>
>
>
> I'm using Spark Structured Streaming for Machine Learning purpose in real
> time, and I want to stored predictions in my Cassandra cluster.
>
>
>
> Since I am in a streaming context, executing multiple times per seconds
> the same request, one mandatory optimization is to use PreparedStatement.
>
>
>
> In the cassandra spark driver (
> https://github.com/datastax/spark-cassandra-connector) there is no way to
> use PreparedStatement (in scala or python, i'm not considering java as a
> option)
>
>
>
> Should i use a scala  (https://github.com/outworkers/phantom) / python (
> https://github.com/datastax/python-driver) cassandra driver ?
>
> How does it work then, my connection object need to be serializable to be
> passed to workers ?
>
>
>
> If anyone can help me !
>
>
>
> Thanks :)
>
> Bastien
>


None.get on Redact in DataSourceScanExec

2017-07-13 Thread Russell Spitzer
Sorry if this is a double post, wasn't sure if I got through on my
forwarding.

I mentioned this in the RC2 note for 2.2.0 of Spark and i'm seeing it now
on the official release. Running the Spark Casasnadra Connector integration
tests for the SCC now fail whenever trying to do something involving the
CassandraSource being transformed into the DataSourceScanExec SparkPlan.

https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L70

Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
This leads to an None.get (full exception below)

This only seems to reproduce when I run from within sbt, running through
the IntelliJ scalaTest runner works fine on the same code. This makes me
think that perhaps something about how sbt is loading the

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

class is somehow avoiding having it have access to the getActiveSession

I'm wondering if anyone else has run into this and found a workaround, I
saw a similar report posted to the end of this ticket

https://issues.apache.org/jira/browse/SPARK-16599?focusedCommentId=16038185=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16038185

I have tried setting the ActiveSession but it doesn't seem relevant to the
thread which ends up calling getActiveSession


Thanks for your time,
Russ

The failure is

java.util.NoSuchElementException: None.get
[info]  at scala.None$.get(Option.scala:347)
[info]  at scala.None$.get(Option.scala:345)
[info]  at org.apache.spark.sql.execution.DataSourceScanExec$class.org
$apache$spark$sql$execution$DataSourceScanExec$$redact(DataSourceScanExec.scala:70)
[info]  at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:54)
[info]  at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:52)
[info]  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]  at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]  at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]  at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info]  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info]  at
org.apache.spark.sql.execution.DataSourceScanExec$class.simpleString(DataSourceScanExec.scala:52)
[info]  at
org.apache.spark.sql.execution.RowDataSourceScanExec.simpleString(DataSourceScanExec.scala:75)
[info]  at
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:349)
...

I can post the full exception with the attempt to serialize and scalaTest
wrapping if anyone wants to see that but it is quite long.


Fwd: None.get on Redact in DataSourceScanExec

2017-07-13 Thread Russell Spitzer
I mentioned this in the RC2 note for 2.2.0 of Spark and i'm seeing it now
on the official release. Running the Spark Casasnadra Connector integration
tests for the SCC now fail whenever trying to do something involving the
CassandraSource being transformed into the DataSourceScanExec SparkPlan.

https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L70

Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
This leads to an None.get (full exception below)

This only seems to reproduce when I run from within sbt, running through
the IntelliJ scalaTest runner works fine on the same code. This makes me
think that perhaps something about how sbt is loading the

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

class is somehow avoiding having it have access to the getActiveSession

I'm wondering if anyone else has run into this and found a workaround, I
saw a similar report posted to the end of this ticket

https://issues.apache.org/jira/browse/SPARK-16599?focusedCommentId=16038185=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16038185

I have tried setting the ActiveSession but it doesn't seem relevant to the
thread which ends up calling getActiveSession


Thanks for your time,
Russ

The failure is

java.util.NoSuchElementException: None.get
[info] at scala.None$.get(Option.scala:347)
[info] at scala.None$.get(Option.scala:345)
[info] at org.apache.spark.sql.execution.DataSourceScanExec$class.org
$apache$spark$sql$execution$DataSourceScanExec$$redact(DataSourceScanExec.scala:70)
[info] at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:54)
[info] at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:52)
[info] at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info] at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info] at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info] at
org.apache.spark.sql.execution.DataSourceScanExec$class.simpleString(DataSourceScanExec.scala:52)
[info] at
org.apache.spark.sql.execution.RowDataSourceScanExec.simpleString(DataSourceScanExec.scala:75)
[info] at
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:349)
...

I can post the full exception with the attempt to serialize and scalaTest
wrapping if anyone wants to see that but it is quite long.


Re: How does HashPartitioner distribute data in Spark?

2017-06-25 Thread Russell Spitzer
A more clear explanation.

`parallelize` does not apply a partitioner. We can see this pretty quickly
with a quick code example

scala> val rdd1 = sc.parallelize(Seq(("aa" , 1),("aa",2), ("aa", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at
parallelize at :24

scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None

It has not partitioner because parallelize just packs up the collection
into partition metadata without looking at the actual contents of the
collection.

scala> rdd1.foreachPartition(it => println(it.length))
1
0
1
1
0
0
0
0

If we actually shuffle the data using the hash partitioner (using the
repartition command) we get the expected results.

scala> rdd1.repartition(8).foreachPartition(it => println(it.length))
0
0
0
0
0
0
0
3


On Sat, Jun 24, 2017 at 12:22 PM Russell Spitzer <russell.spit...@gmail.com>
wrote:

> Neither of your code examples invoke a repartitioning. Add in a
> repartition command.
>
> On Sat, Jun 24, 2017, 11:53 AM Vikash Pareek <
> vikash.par...@infoobjects.com> wrote:
>
>> Hi Vadim,
>>
>> Thank you for your response.
>>
>> I would like to know how partitioner choose the key, If we look at my
>> example then following question arises:
>> 1. In case of rdd1, hash partitioning should calculate hashcode of key
>> (i.e. *"aa"* in this case), so *all records should go to single
>> partition*
>> instead of uniform distribution?
>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>> going to work i.e. *what is the key* to calculate hashcode?
>>
>>
>>
>> Best Regards,
>>
>>
>> [image: InfoObjects Inc.] <http://www.infoobjects.com/>
>> Vikash Pareek
>> Team Lead  *InfoObjects Inc.*
>> Big Data Analytics
>>
>> m: +91 8800206898 <+91%2088002%2006898> a: E5, Jhalana Institutionall
>> Area, Jaipur, Rajasthan 302004
>> w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com
>>
>>
>>
>>
>> On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> This is the code that chooses the partition for a key:
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88
>>>
>>> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>>>
>>> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
>>> vikash.par...@infoobjects.com> wrote:
>>>
>>>> I am trying to understand how spark partitoing works.
>>>>
>>>> To understand this I have following piece of code on spark 1.6
>>>>
>>>> def countByPartition1(rdd: RDD[(String, Int)]) = {
>>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>>> }
>>>> def countByPartition2(rdd: RDD[String]) = {
>>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>>> }
>>>>
>>>> //RDDs Creation
>>>> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>>>> ("aa",
>>>> 1)), 8)
>>>> countByPartition(rdd1).collect()
>>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>>
>>>> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>>>> countByPartition(rdd2).collect()
>>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>>
>>>> In both the cases data is distributed uniformaly.
>>>> I do have following questions on the basis of above observation:
>>>>
>>>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>>>> (i.e. "aa" in this case), so all records should go to single partition
>>>> instead of uniform distribution?
>>>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>>>> going to work i.e. what is the key to calculate hashcode?
>>>>
>>>> I have followed @zero323 answer but not getting answer of these.
>>>>
>>>> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>>>>
>>>>
>>>>
>>>>
>>>> -
>>>>
>>>> __Vikash Pareek
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>


Re: How does HashPartitioner distribute data in Spark?

2017-06-24 Thread Russell Spitzer
Neither of your code examples invoke a repartitioning. Add in a repartition
command.

On Sat, Jun 24, 2017, 11:53 AM Vikash Pareek 
wrote:

> Hi Vadim,
>
> Thank you for your response.
>
> I would like to know how partitioner choose the key, If we look at my
> example then following question arises:
> 1. In case of rdd1, hash partitioning should calculate hashcode of key
> (i.e. *"aa"* in this case), so *all records should go to single partition*
> instead of uniform distribution?
>  2. In case of rdd2, there is no key value pair so how hash partitoning
> going to work i.e. *what is the key* to calculate hashcode?
>
>
>
> Best Regards,
>
>
> [image: InfoObjects Inc.] 
> Vikash Pareek
> Team Lead  *InfoObjects Inc.*
> Big Data Analytics
>
> m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
> 302004
> w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com
>
>
>
>
> On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> This is the code that chooses the partition for a key:
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88
>>
>> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>>
>> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
>> vikash.par...@infoobjects.com> wrote:
>>
>>> I am trying to understand how spark partitoing works.
>>>
>>> To understand this I have following piece of code on spark 1.6
>>>
>>> def countByPartition1(rdd: RDD[(String, Int)]) = {
>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>> }
>>> def countByPartition2(rdd: RDD[String]) = {
>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>> }
>>>
>>> //RDDs Creation
>>> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>>> ("aa",
>>> 1)), 8)
>>> countByPartition(rdd1).collect()
>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>
>>> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>>> countByPartition(rdd2).collect()
>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>
>>> In both the cases data is distributed uniformaly.
>>> I do have following questions on the basis of above observation:
>>>
>>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>>> (i.e. "aa" in this case), so all records should go to single partition
>>> instead of uniform distribution?
>>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>>> going to work i.e. what is the key to calculate hashcode?
>>>
>>> I have followed @zero323 answer but not getting answer of these.
>>>
>>> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>>>
>>>
>>>
>>>
>>> -
>>>
>>> __Vikash Pareek
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: I am not sure why I am getting java.lang.NoClassDefFoundError

2017-02-17 Thread Russell Spitzer
Great catch Anastasios!

On Fri, Feb 17, 2017 at 9:59 AM Anastasios Zouzias 
wrote:

> Hey,
>
> Can you try with the 2.11 spark-cassandra-connector? You just reported
> that you use spark-cassandra-connector*_2.10*-2.0.0-RC1.jar
>
> Best,
> Anastasios
>
> On Fri, Feb 17, 2017 at 6:40 PM, kant kodali  wrote:
>
> Hi,
>
>
> val df = spark.read.format("org.apache.spark.sql.cassandra").options(Map(
> "table" -> "hello", "keyspace" -> "test" )).load()
>
> This line works fine. I can see it actually pulled the table schema from
> cassandra. however when I do
>
> df.count I get the error below.
>
>
> I am using the following jars.
>
> spark version 2.0.2
>
> spark-sql_2.11-2.0.2.jar
>
> spark-cassandra-connector_2.10-2.0.0-RC1.jar
>
> Java version 8
>
> scala version 2.11.8
>
>
>
> java.lang.NoClassDefFoundError:
> scala/runtime/AbstractPartialFunction$mcJL$sp
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at
> com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)
>
> at
> com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassNotFoundException:
> scala.runtime.AbstractPartialFunction$mcJL$sp
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 35 more
>
> 17/02/17 17:35:33 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
>
> java.lang.NoClassDefFoundError:
> com/datastax/spark/connector/rdd/CassandraLimit$$anonfun$limitForIterator$1
>
> at
> com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(CassandraLimit.scala:21)
>
> at
> com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at 

Re: spark architecture question -- Pleas Read

2017-01-27 Thread Russell Spitzer
You can treat Oracle as a JDBC source (
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases)
and skip Sqoop, HiveTables and go straight to Queries. Then you can skip
hive on the way back out (see the same link) and write directly to Oracle.
I'll leave the performance questions for someone else.

On Fri, Jan 27, 2017 at 11:06 PM Sirisha Cheruvu  wrote:

>
> On Sat, Jan 28, 2017 at 6:44 AM, Sirisha Cheruvu 
> wrote:
>
> Hi Team,
>
> RIght now our existing flow is
>
> Oracle-->Sqoop --> Hive--> Hive Queries on Spark-sql (Hive
> Context)-->Destination Hive table -->sqoop export to Oracle
>
> Half of the Hive UDFS required is developed in Java UDF..
>
> SO Now I want to know if I run the native scala UDF's than runninng hive
> java udfs in spark-sql will there be any performance difference
>
>
> Can we skip the Sqoop Import and export part and
>
> Instead directly load data from oracle to spark and code Scala UDF's for
> transformations and export output data back to oracle?
>
> RIght now the architecture we are using is
>
> oracle-->Sqoop (Import)-->Hive Tables--> Hive Queries --> Spark-SQL-->
> Hive --> Oracle
> what would be optimal architecture to process data from oracle using spark
> ?? can i anyway better this process ?
>
>
>
>
> Regards,
> Sirisha
>
>
>


Re: How to use Spark SQL to connect to Cassandra from Spark-Shell?

2016-11-11 Thread Russell Spitzer
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

Has all the information about Dataframes/ SparkSql

On Fri, Nov 11, 2016 at 8:52 AM kant kodali  wrote:

> Wait I cannot create CassandraSQLContext from spark-shell. is this only
> for enterprise versions?
>
> Thanks!
>
> On Fri, Nov 11, 2016 at 8:14 AM, kant kodali  wrote:
>
>
> https://academy.datastax.com/courses/ds320-analytics-apache-spark/spark-sql-spark-sql-basics
>
> On Fri, Nov 11, 2016 at 8:11 AM, kant kodali  wrote:
>
> Hi,
>
> This is spark-cassandra-connector
>  but I am looking
> more for how to use SPARK SQL and expose as a JDBC server for Cassandra.
>
> Thanks!
>
>
> On Fri, Nov 11, 2016 at 8:07 AM, Yong Zhang  wrote:
>
> Read the document on https://github.com/datastax/spark-cassandra-connector
>
>
> Yong
>
>
>
> --
> *From:* kant kodali 
> *Sent:* Friday, November 11, 2016 11:04 AM
> *To:* user @spark
> *Subject:* How to use Spark SQL to connect to Cassandra from Spark-Shell?
>
> How to use Spark SQL to connect to Cassandra from Spark-Shell?
>
> Any examples ? I use Java 8.
>
> Thanks!
> kant
>
>
>
>
>


Re: Write to Cassandra table from pyspark fails with scala reflect error

2016-09-15 Thread Russell Spitzer
If the download fails you have to start figuring out if you have network
issues or if your local cache is messed up :( I would see if you can
manually pull that artifact or try running through just spark-shell first
to see if that gives any more verbose output.

On Thu, Sep 15, 2016 at 6:48 AM Trivedi Amit <amit_...@yahoo.com> wrote:

> Thanks Russell. I didn't build this myself. I tried with Scala 2.11
> com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M(1-3) and I am
> getting
>
> ```
> Exception in thread "main" java.lang.RuntimeException: [download failed:
> org.scala-lang#scala-reflect;2.11.8!scala-reflect.jar]
> at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1066)
> at
> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:294)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:158)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Traceback (most recent call last):
>   File "$SPARK_HOME/python/pyspark/shell.py", line 38, in 
> SparkContext._ensure_initialized()
>   File "$SPARK_HOME/python/pyspark/context.py", line 243, in
> _ensure_initialized
> SparkContext._gateway = gateway or launch_gateway()
>   File "$SPARK_HOME/python/pyspark/java_gateway.py", line 94, in
> launch_gateway
> raise Exception("Java gateway process exited before sending the driver
> its port number")
> Exception: Java gateway process exited before sending the driver its port
> number
> ```
> I deleted my .m2 directory to avoid any conflicts with cached or older
> versions. I only have SPARK_HOME environment variable set (env variables
> related to Spark and Python).
>
> --
> *From:* Russell Spitzer <russell.spit...@gmail.com>
> *To:* Trivedi Amit <amit_...@yahoo.com>; "user@spark.apache.org" <
> user@spark.apache.org>
> *Sent:* Wednesday, September 14, 2016 11:24 PM
> *Subject:* Re: Write to Cassandra table from pyspark fails with scala
> reflect error
>
> Spark 2.0 defaults to Scala 2.11, so if you didn't build it yourself you
> need the 2.11 artifact for the Spark Cassandra Connector.
>
> On Wed, Sep 14, 2016 at 7:44 PM Trivedi Amit <amit_...@yahoo.com.invalid>
> wrote:
>
> Hi,
>
>
>
> I am testing a pyspark program that will read from a csv file and write
> data into Cassandra table. I am using pyspark with
> spark-cassandra-connector 2.10:2.0.0-M3. I am using Spark v2.0.0.
>
> While executing below command
>
> ```df.write.format("org.apache.spark.sql.cassandra").mode('append').options(
> table="test_table", keyspace="test").save()```
>
> I am getting
> ```
> py4j.protocol.Py4JJavaError: An error occurred while calling o47.save.
> : java.lang.NoSuchMethodError:
> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
>
> at
> com.datastax.spark.connector.types.TypeConverter$.(TypeConverter.scala:116)
>
> at
> com.datastax.spark.connector.types.TypeConverter$.(TypeConverter.scala)
>
> at
> com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
>
> at
> com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$3.apply(SqlRowWriter.scala:18)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$3.apply(SqlRowWriter.scala:18)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> com.datastax.spark.connector.writer.SqlRowWriter.(SqlRowWriter.scala:18)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$Factory$.rowWriter(SqlRowWriter.scala:36)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$Factory$.rowWri

Re: Write to Cassandra table from pyspark fails with scala reflect error

2016-09-14 Thread Russell Spitzer
Spark 2.0 defaults to Scala 2.11, so if you didn't build it yourself you
need the 2.11 artifact for the Spark Cassandra Connector.

On Wed, Sep 14, 2016 at 7:44 PM Trivedi Amit 
wrote:

> Hi,
>
>
>
> I am testing a pyspark program that will read from a csv file and write
> data into Cassandra table. I am using pyspark with
> spark-cassandra-connector 2.10:2.0.0-M3. I am using Spark v2.0.0.
>
> While executing below command
>
> ```df.write.format("org.apache.spark.sql.cassandra").mode('append').options(
> table="test_table", keyspace="test").save()```
>
> I am getting
> ```
> py4j.protocol.Py4JJavaError: An error occurred while calling o47.save.
> : java.lang.NoSuchMethodError:
> scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
>
> at
> com.datastax.spark.connector.types.TypeConverter$.(TypeConverter.scala:116)
>
> at
> com.datastax.spark.connector.types.TypeConverter$.(TypeConverter.scala)
>
> at
> com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
>
> at
> com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$3.apply(SqlRowWriter.scala:18)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$$anonfun$3.apply(SqlRowWriter.scala:18)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> com.datastax.spark.connector.writer.SqlRowWriter.(SqlRowWriter.scala:18)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$Factory$.rowWriter(SqlRowWriter.scala:36)
>
> at
> com.datastax.spark.connector.writer.SqlRowWriter$Factory$.rowWriter(SqlRowWriter.scala:34)
>
> at
> com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:271)
>
> at
> com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
>
> at
> org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:66)
>
> at
> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
>
> at
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
>
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:211)
> at java.lang.Thread.run(Thread.java:745)
> ```
>
> Google search for this error lead to threads where folks were talking
> about using scala version 2.10 instead of 2.11 for this issue. However, I
> am not using Scala and I am assuming that 2.10 in spark-cassandra-connector
> is Scala version.
>
> Don't know how to fix or get around this issue. Appreciate any help.
>
> Thanks
> AT
>
>
>


Re: spark cassandra issue

2016-09-04 Thread Russell Spitzer
This would also be a better question for the SCC user list :)
https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user

On Sun, Sep 4, 2016 at 9:31 AM Russell Spitzer <russell.spit...@gmail.com>
wrote:

>
> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.1/doc/14_data_frames.md
> In Spark 1.3 it was illegal to use "table" as a key in Spark SQL so in
> that version of Spark the connector needed to use the option "c_table"
>
>
> val df = sqlContext.read.
>  | format("org.apache.spark.sql.cassandra").
>  | options(Map( "c_table" -> "", "keyspace" -> "***")).
>  | load()
>
>
> On Sun, Sep 4, 2016 at 8:32 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> and your Cassandra table is there etc?
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 4 September 2016 at 16:20, Selvam Raman <sel...@gmail.com> wrote:
>>
>>> Hey Mich,
>>>
>>> I am using the same one right now. Thanks for the reply.
>>> import org.apache.spark.sql.cassandra._
>>> import com.datastax.spark.connector._ //Loads implicit functions
>>> sc.cassandraTable("keyspace name", "table name")
>>>
>>>
>>> On Sun, Sep 4, 2016 at 8:48 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi Selvan.
>>>>
>>>> I don't deal with Cassandra but have you tried other options as
>>>> described here
>>>>
>>>>
>>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
>>>>
>>>> To get a Spark RDD that represents a Cassandra table, call the
>>>> cassandraTable method on the SparkContext object.
>>>>
>>>> import com.datastax.spark.connector._ //Loads implicit functions
>>>> sc.cassandraTable("keyspace name", "table name")
>>>>
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>> On 4 September 2016 at 15:52, Selvam Raman <sel...@gmail.com> wrote:
>>>>
>>>>> its very urgent. please help me guys.
>>>>>
>>>>> On Sun, Sep 4, 2016 at 8:05 PM, Selvam Raman <sel...@gmail.com> wrote:
>>>>>
>>>>>> Please help me to solve the issue.
>>>>>>
>>>>>> spark-shell --packages
>>>>>> com.datastax.spark:spark-cassandra-connector_2.10:1.3.0 --conf
>>>>>> spark.cassandra.connection.host=**
>>>>>>
>>>>>> val df = sqlContext.read.
>>>>>>  | format("org.apache.spark.sql.cassandra").
>>>>>>  | options(Map( "table" -> "", "keyspace" -> "***")).
>>>>>>  | load()
>>>>>> java.util.NoSuchElementException: key not found: c_table
>>>>>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>>>> at
>>>>>> org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.default(ddl.scala:151)
>>>>>> at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>>>>> at
>>>>>> org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.apply(ddl.scala:151)
>>>>>> at
>>>>>> org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:120)
>>>>>> at
>>>>>> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
>>>>>> at
>>>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
>>>>>> a
>>>>>>
>>>>>> --
>>>>>> Selvam Raman
>>>>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Selvam Raman
>>>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>>


Re: spark cassandra issue

2016-09-04 Thread Russell Spitzer
https://github.com/datastax/spark-cassandra-connector/blob/v1.3.1/doc/14_data_frames.md
In Spark 1.3 it was illegal to use "table" as a key in Spark SQL so in that
version of Spark the connector needed to use the option "c_table"

val df = sqlContext.read.
 | format("org.apache.spark.sql.cassandra").
 | options(Map( "c_table" -> "", "keyspace" -> "***")).
 | load()


On Sun, Sep 4, 2016 at 8:32 AM Mich Talebzadeh 
wrote:

> and your Cassandra table is there etc?
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 4 September 2016 at 16:20, Selvam Raman  wrote:
>
>> Hey Mich,
>>
>> I am using the same one right now. Thanks for the reply.
>> import org.apache.spark.sql.cassandra._
>> import com.datastax.spark.connector._ //Loads implicit functions
>> sc.cassandraTable("keyspace name", "table name")
>>
>>
>> On Sun, Sep 4, 2016 at 8:48 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Selvan.
>>>
>>> I don't deal with Cassandra but have you tried other options as
>>> described here
>>>
>>>
>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md
>>>
>>> To get a Spark RDD that represents a Cassandra table, call the
>>> cassandraTable method on the SparkContext object.
>>>
>>> import com.datastax.spark.connector._ //Loads implicit functions
>>> sc.cassandraTable("keyspace name", "table name")
>>>
>>>
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 4 September 2016 at 15:52, Selvam Raman  wrote:
>>>
 its very urgent. please help me guys.

 On Sun, Sep 4, 2016 at 8:05 PM, Selvam Raman  wrote:

> Please help me to solve the issue.
>
> spark-shell --packages
> com.datastax.spark:spark-cassandra-connector_2.10:1.3.0 --conf
> spark.cassandra.connection.host=**
>
> val df = sqlContext.read.
>  | format("org.apache.spark.sql.cassandra").
>  | options(Map( "table" -> "", "keyspace" -> "***")).
>  | load()
> java.util.NoSuchElementException: key not found: c_table
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at
> org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.default(ddl.scala:151)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at
> org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.apply(ddl.scala:151)
> at
> org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:120)
> at
> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
> a
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



 --
 Selvam Raman
 "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

>>>
>>>
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


Re: Insert non-null values from dataframe

2016-08-26 Thread Russell Spitzer
Cassandra does not differentiate between null and empty, so when reading
from C* all empty values are reported as null. To avoid inserting nulls
(avoiding tombstones) see

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#globally-treating-all-nulls-as-unset

This will not prevent those columns from being read as null though, it will
only skip writing tombstones.

On Thu, Aug 25, 2016, 1:23 PM Selvam Raman  wrote:

> Hi ,
>
> Dataframe:
> colA colB colC colD colE
> 1 2 3 4 5
> 1 2 3 null null
> 1 null null  null 5
> null null  3 4 5
>
> I want to insert dataframe to nosql database, where null occupies
> values(Cassandra). so i have to insert the column which has non-null values
> in the row.
>
> Expected:
>
> Record 1: (1,2,3,4,5)
> Record 2:(1,2,3)
> Record 3:(1,5)
> Record 4:(3,4,5)
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Is "spark streaming" streaming or mini-batch?

2016-08-23 Thread Russell Spitzer
Spark streaming does not process 1 event at a time which is in general I
think what people call "Streaming." It instead processes groups of events.
Each group is a "MicroBatch" that gets processed at the same time.

Streaming theoretically always has better latency because the event is
processed as soon as it arrives. While in microbatching the latency of all
the events in the batch can be no better than the last element to arrive.

Streaming theoretically has worse performance because events cannot be
processed in bulk.

In practice throughput and latency are very implementation dependent

On Tue, Aug 23, 2016 at 8:41 AM Aseem Bansal  wrote:

> I was reading this article https://www.inovex.de/blog/storm-in-a-teacup/
> and it mentioned that spark streaming actually mini-batch not actual
> streaming.
>
> I have not used streaming and I am not sure what is the difference in the
> 2 terms. Hence could not make a judgement myself.
>